HDFS-2416. distcp with a webhdfs uri on a secure cluster fails.

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1196434 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Jitendra Nath Pandey 2011-11-02 02:08:20 +00:00
parent 3ad6024a84
commit bd21ddcb78
12 changed files with 278 additions and 31 deletions

View File

@ -88,6 +88,8 @@ Trunk (unreleased changes)
HDFS-2526. (Client)NamenodeProtocolTranslatorR23 do not need to keep a HDFS-2526. (Client)NamenodeProtocolTranslatorR23 do not need to keep a
reference to rpcProxyWithoutRetry (atm) reference to rpcProxyWithoutRetry (atm)
HDFS-2416. distcp with a webhdfs uri on a secure cluster fails. (jitendra)
Release 0.23.0 - 2011-11-01 Release 0.23.0 - 2011-11-01
INCOMPATIBLE CHANGES INCOMPATIBLE CHANGES

View File

@ -40,13 +40,13 @@ import org.apache.hadoop.hdfs.web.resources.OffsetParam;
public class ByteRangeInputStream extends FSInputStream { public class ByteRangeInputStream extends FSInputStream {
/** /**
* This class wraps a URL to allow easy mocking when testing. The URL class * This class wraps a URL and provides method to open connection.
* cannot be easily mocked because it is public. * It can be overridden to change how a connection is opened.
*/ */
static class URLOpener { public static class URLOpener {
protected URL url; protected URL url;
/** The url with offset parameter */ /** The url with offset parameter */
private URL offsetUrl; protected URL offsetUrl;
public URLOpener(URL u) { public URLOpener(URL u) {
url = u; url = u;
@ -60,7 +60,7 @@ public class ByteRangeInputStream extends FSInputStream {
return url; return url;
} }
HttpURLConnection openConnection() throws IOException { protected HttpURLConnection openConnection() throws IOException {
return (HttpURLConnection)offsetUrl.openConnection(); return (HttpURLConnection)offsetUrl.openConnection();
} }
@ -125,7 +125,13 @@ public class ByteRangeInputStream extends FSInputStream {
this(new URLOpener(url), new URLOpener(null)); this(new URLOpener(url), new URLOpener(null));
} }
ByteRangeInputStream(URLOpener o, URLOpener r) { /**
* Create with the specified URLOpeners. Original url is used to open the
* stream for the first time. Resolved url is used in subsequent requests.
* @param o Original url
* @param r Resolved url
*/
public ByteRangeInputStream(URLOpener o, URLOpener r) {
this.originalURL = o; this.originalURL = o;
this.resolvedURL = r; this.resolvedURL = r;
} }

View File

@ -53,6 +53,7 @@ import org.apache.hadoop.hdfs.protocol.LocatedBlocks;
import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier; import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier;
import org.apache.hadoop.hdfs.security.token.delegation.DelegationTokenIdentifier; import org.apache.hadoop.hdfs.security.token.delegation.DelegationTokenIdentifier;
import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeDescriptor; import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeDescriptor;
import org.apache.hadoop.hdfs.server.namenode.NameNode;
import org.apache.hadoop.hdfs.server.namenode.NameNodeHttpServer; import org.apache.hadoop.hdfs.server.namenode.NameNodeHttpServer;
import org.apache.hadoop.hdfs.web.resources.DelegationParam; import org.apache.hadoop.hdfs.web.resources.DelegationParam;
import org.apache.hadoop.hdfs.web.resources.UserParam; import org.apache.hadoop.hdfs.web.resources.UserParam;
@ -552,6 +553,13 @@ public class JspHelper {
DataInputStream in = new DataInputStream(buf); DataInputStream in = new DataInputStream(buf);
DelegationTokenIdentifier id = new DelegationTokenIdentifier(); DelegationTokenIdentifier id = new DelegationTokenIdentifier();
id.readFields(in); id.readFields(in);
if (context != null) {
final NameNode nn = NameNodeHttpServer.getNameNodeFromContext(context);
if (nn != null) {
// Verify the token.
nn.getNamesystem().verifyToken(id, token.getPassword());
}
}
ugi = id.getUser(); ugi = id.getUser();
checkUsername(ugi.getShortUserName(), usernameFromQuery); checkUsername(ugi.getShortUserName(), usernameFromQuery);
checkUsername(ugi.getShortUserName(), user); checkUsername(ugi.getShortUserName(), user);

View File

@ -4421,4 +4421,15 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
public BlockManager getBlockManager() { public BlockManager getBlockManager() {
return blockManager; return blockManager;
} }
/**
* Verifies that the given identifier and password are valid and match.
* @param identifier Token identifier.
* @param password Password in the token.
* @throws InvalidToken
*/
public synchronized void verifyToken(DelegationTokenIdentifier identifier,
byte[] password) throws InvalidToken {
getDelegationTokenSecretManager().verifyToken(identifier, password);
}
} }

View File

@ -68,6 +68,7 @@ import org.apache.hadoop.hdfs.web.resources.AccessTimeParam;
import org.apache.hadoop.hdfs.web.resources.BlockSizeParam; import org.apache.hadoop.hdfs.web.resources.BlockSizeParam;
import org.apache.hadoop.hdfs.web.resources.BufferSizeParam; import org.apache.hadoop.hdfs.web.resources.BufferSizeParam;
import org.apache.hadoop.hdfs.web.resources.DelegationParam; import org.apache.hadoop.hdfs.web.resources.DelegationParam;
import org.apache.hadoop.hdfs.web.resources.TokenArgumentParam;
import org.apache.hadoop.hdfs.web.resources.DeleteOpParam; import org.apache.hadoop.hdfs.web.resources.DeleteOpParam;
import org.apache.hadoop.hdfs.web.resources.DestinationParam; import org.apache.hadoop.hdfs.web.resources.DestinationParam;
import org.apache.hadoop.hdfs.web.resources.GetOpParam; import org.apache.hadoop.hdfs.web.resources.GetOpParam;
@ -104,7 +105,7 @@ public class NamenodeWebHdfsMethods {
public static final Log LOG = LogFactory.getLog(NamenodeWebHdfsMethods.class); public static final Log LOG = LogFactory.getLog(NamenodeWebHdfsMethods.class);
private static final UriFsPathParam ROOT = new UriFsPathParam(""); private static final UriFsPathParam ROOT = new UriFsPathParam("");
private static final ThreadLocal<String> REMOTE_ADDRESS = new ThreadLocal<String>(); private static final ThreadLocal<String> REMOTE_ADDRESS = new ThreadLocal<String>();
/** @return the remote client address. */ /** @return the remote client address. */
@ -224,11 +225,13 @@ public class NamenodeWebHdfsMethods {
@QueryParam(AccessTimeParam.NAME) @DefaultValue(AccessTimeParam.DEFAULT) @QueryParam(AccessTimeParam.NAME) @DefaultValue(AccessTimeParam.DEFAULT)
final AccessTimeParam accessTime, final AccessTimeParam accessTime,
@QueryParam(RenameOptionSetParam.NAME) @DefaultValue(RenameOptionSetParam.DEFAULT) @QueryParam(RenameOptionSetParam.NAME) @DefaultValue(RenameOptionSetParam.DEFAULT)
final RenameOptionSetParam renameOptions final RenameOptionSetParam renameOptions,
@QueryParam(TokenArgumentParam.NAME) @DefaultValue(TokenArgumentParam.DEFAULT)
final TokenArgumentParam delegationTokenArgument
) throws IOException, InterruptedException { ) throws IOException, InterruptedException {
return put(ugi, delegation, ROOT, op, destination, owner, group, return put(ugi, delegation, ROOT, op, destination, owner, group,
permission, overwrite, bufferSize, replication, blockSize, permission, overwrite, bufferSize, replication, blockSize,
modificationTime, accessTime, renameOptions); modificationTime, accessTime, renameOptions, delegationTokenArgument);
} }
/** Handle HTTP PUT request. */ /** Handle HTTP PUT request. */
@ -264,7 +267,9 @@ public class NamenodeWebHdfsMethods {
@QueryParam(AccessTimeParam.NAME) @DefaultValue(AccessTimeParam.DEFAULT) @QueryParam(AccessTimeParam.NAME) @DefaultValue(AccessTimeParam.DEFAULT)
final AccessTimeParam accessTime, final AccessTimeParam accessTime,
@QueryParam(RenameOptionSetParam.NAME) @DefaultValue(RenameOptionSetParam.DEFAULT) @QueryParam(RenameOptionSetParam.NAME) @DefaultValue(RenameOptionSetParam.DEFAULT)
final RenameOptionSetParam renameOptions final RenameOptionSetParam renameOptions,
@QueryParam(TokenArgumentParam.NAME) @DefaultValue(TokenArgumentParam.DEFAULT)
final TokenArgumentParam delegationTokenArgument
) throws IOException, InterruptedException { ) throws IOException, InterruptedException {
if (LOG.isTraceEnabled()) { if (LOG.isTraceEnabled()) {
@ -344,7 +349,7 @@ public class NamenodeWebHdfsMethods {
case RENEWDELEGATIONTOKEN: case RENEWDELEGATIONTOKEN:
{ {
final Token<DelegationTokenIdentifier> token = new Token<DelegationTokenIdentifier>(); final Token<DelegationTokenIdentifier> token = new Token<DelegationTokenIdentifier>();
token.decodeFromUrlString(delegation.getValue()); token.decodeFromUrlString(delegationTokenArgument.getValue());
final long expiryTime = np.renewDelegationToken(token); final long expiryTime = np.renewDelegationToken(token);
final String js = JsonUtil.toJsonString("long", expiryTime); final String js = JsonUtil.toJsonString("long", expiryTime);
return Response.ok(js).type(MediaType.APPLICATION_JSON).build(); return Response.ok(js).type(MediaType.APPLICATION_JSON).build();
@ -352,7 +357,7 @@ public class NamenodeWebHdfsMethods {
case CANCELDELEGATIONTOKEN: case CANCELDELEGATIONTOKEN:
{ {
final Token<DelegationTokenIdentifier> token = new Token<DelegationTokenIdentifier>(); final Token<DelegationTokenIdentifier> token = new Token<DelegationTokenIdentifier>();
token.decodeFromUrlString(delegation.getValue()); token.decodeFromUrlString(delegationTokenArgument.getValue());
np.cancelDelegationToken(token); np.cancelDelegationToken(token);
return Response.ok().type(MediaType.APPLICATION_JSON).build(); return Response.ok().type(MediaType.APPLICATION_JSON).build();
} }

View File

@ -17,11 +17,17 @@
*/ */
package org.apache.hadoop.hdfs.web; package org.apache.hadoop.hdfs.web;
import java.io.IOException;
import java.util.Properties; import java.util.Properties;
import javax.servlet.FilterChain;
import javax.servlet.FilterConfig; import javax.servlet.FilterConfig;
import javax.servlet.ServletException; import javax.servlet.ServletException;
import javax.servlet.ServletRequest;
import javax.servlet.ServletResponse;
import javax.servlet.http.HttpServletRequest;
import org.apache.hadoop.hdfs.web.resources.DelegationParam;
import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.security.authentication.server.AuthenticationFilter; import org.apache.hadoop.security.authentication.server.AuthenticationFilter;
import org.apache.hadoop.security.authentication.server.KerberosAuthenticationHandler; import org.apache.hadoop.security.authentication.server.KerberosAuthenticationHandler;
@ -55,6 +61,21 @@ public class AuthFilter extends AuthenticationFilter {
p.setProperty(PseudoAuthenticationHandler.ANONYMOUS_ALLOWED, "true"); p.setProperty(PseudoAuthenticationHandler.ANONYMOUS_ALLOWED, "true");
//set cookie path //set cookie path
p.setProperty(COOKIE_PATH, "/"); p.setProperty(COOKIE_PATH, "/");
return p; return p;
}
@Override
public void doFilter(ServletRequest request, ServletResponse response,
FilterChain filterChain) throws IOException, ServletException {
HttpServletRequest httpRequest = (HttpServletRequest) request;
String tokenString = httpRequest
.getParameter(DelegationParam.NAME);
if (tokenString != null) {
//Token is present in the url, therefore token will be used for
//authentication, bypass kerberos authentication.
filterChain.doFilter(httpRequest, response);
return;
}
super.doFilter(request, response, filterChain);
} }
} }

View File

@ -62,6 +62,7 @@ import org.apache.hadoop.hdfs.server.namenode.SafeModeException;
import org.apache.hadoop.hdfs.web.resources.AccessTimeParam; import org.apache.hadoop.hdfs.web.resources.AccessTimeParam;
import org.apache.hadoop.hdfs.web.resources.BlockSizeParam; import org.apache.hadoop.hdfs.web.resources.BlockSizeParam;
import org.apache.hadoop.hdfs.web.resources.BufferSizeParam; import org.apache.hadoop.hdfs.web.resources.BufferSizeParam;
import org.apache.hadoop.hdfs.web.resources.TokenArgumentParam;
import org.apache.hadoop.hdfs.web.resources.DeleteOpParam; import org.apache.hadoop.hdfs.web.resources.DeleteOpParam;
import org.apache.hadoop.hdfs.web.resources.DestinationParam; import org.apache.hadoop.hdfs.web.resources.DestinationParam;
import org.apache.hadoop.hdfs.web.resources.GetOpParam; import org.apache.hadoop.hdfs.web.resources.GetOpParam;
@ -290,24 +291,42 @@ public class WebHdfsFileSystem extends FileSystem
final String query = op.toQueryString() final String query = op.toQueryString()
+ '&' + new UserParam(ugi) + '&' + new UserParam(ugi)
+ Param.toSortedString("&", parameters); + Param.toSortedString("&", parameters);
final URL url = getNamenodeURL(path, addDt2Query(query)); final URL url;
if (op.equals(PutOpParam.Op.RENEWDELEGATIONTOKEN)
|| op.equals(GetOpParam.Op.GETDELEGATIONTOKEN)) {
// Skip adding delegation token for getting or renewing delegation token,
// because these operations require kerberos authentication.
url = getNamenodeURL(path, query);
} else {
url = getNamenodeURL(path, addDt2Query(query));
}
if (LOG.isTraceEnabled()) { if (LOG.isTraceEnabled()) {
LOG.trace("url=" + url); LOG.trace("url=" + url);
} }
return url; return url;
} }
private HttpURLConnection getHttpUrlConnection(URL url)
throws IOException {
final HttpURLConnection conn;
try {
if (ugi.hasKerberosCredentials()) {
conn = new AuthenticatedURL(AUTH).openConnection(url, authToken);
} else {
conn = (HttpURLConnection)url.openConnection();
}
} catch (AuthenticationException e) {
throw new IOException("Authentication failed, url=" + url, e);
}
return conn;
}
private HttpURLConnection httpConnect(final HttpOpParam.Op op, final Path fspath, private HttpURLConnection httpConnect(final HttpOpParam.Op op, final Path fspath,
final Param<?,?>... parameters) throws IOException { final Param<?,?>... parameters) throws IOException {
final URL url = toUrl(op, fspath, parameters); final URL url = toUrl(op, fspath, parameters);
//connect and get response //connect and get response
final HttpURLConnection conn; final HttpURLConnection conn = getHttpUrlConnection(url);
try {
conn = new AuthenticatedURL(AUTH).openConnection(url, authToken);
} catch(AuthenticationException e) {
throw new IOException("Authentication failed, url=" + url, e);
}
try { try {
conn.setRequestMethod(op.getType().toString()); conn.setRequestMethod(op.getType().toString());
conn.setDoOutput(op.getDoOutput()); conn.setDoOutput(op.getDoOutput());
@ -317,7 +336,7 @@ public class WebHdfsFileSystem extends FileSystem
} }
conn.connect(); conn.connect();
return conn; return conn;
} catch(IOException e) { } catch (IOException e) {
conn.disconnect(); conn.disconnect();
throw e; throw e;
} }
@ -513,7 +532,24 @@ public class WebHdfsFileSystem extends FileSystem
statistics.incrementReadOps(1); statistics.incrementReadOps(1);
final HttpOpParam.Op op = GetOpParam.Op.OPEN; final HttpOpParam.Op op = GetOpParam.Op.OPEN;
final URL url = toUrl(op, f, new BufferSizeParam(buffersize)); final URL url = toUrl(op, f, new BufferSizeParam(buffersize));
return new FSDataInputStream(new ByteRangeInputStream(url)); ByteRangeInputStream str = getByteRangeInputStream(url);
return new FSDataInputStream(str);
}
private class URLOpener extends ByteRangeInputStream.URLOpener {
public URLOpener(URL u) {
super(u);
}
@Override
public HttpURLConnection openConnection() throws IOException {
return getHttpUrlConnection(offsetUrl);
}
}
private ByteRangeInputStream getByteRangeInputStream(URL url) {
return new ByteRangeInputStream(new URLOpener(url), new URLOpener(null));
} }
@Override @Override
@ -576,17 +612,19 @@ public class WebHdfsFileSystem extends FileSystem
private synchronized long renewDelegationToken(final Token<?> token private synchronized long renewDelegationToken(final Token<?> token
) throws IOException { ) throws IOException {
delegationToken = token;
final HttpOpParam.Op op = PutOpParam.Op.RENEWDELEGATIONTOKEN; final HttpOpParam.Op op = PutOpParam.Op.RENEWDELEGATIONTOKEN;
final Map<?, ?> m = run(op, null); TokenArgumentParam dtargParam = new TokenArgumentParam(
token.encodeToUrlString());
final Map<?, ?> m = run(op, null, dtargParam);
return (Long) m.get("long"); return (Long) m.get("long");
} }
private synchronized void cancelDelegationToken(final Token<?> token private synchronized void cancelDelegationToken(final Token<?> token
) throws IOException { ) throws IOException {
delegationToken = token;
final HttpOpParam.Op op = PutOpParam.Op.CANCELDELEGATIONTOKEN; final HttpOpParam.Op op = PutOpParam.Op.CANCELDELEGATIONTOKEN;
run(op, null); TokenArgumentParam dtargParam = new TokenArgumentParam(
token.encodeToUrlString());
run(op, null, dtargParam);
} }
@Override @Override

View File

@ -19,7 +19,7 @@ package org.apache.hadoop.hdfs.web.resources;
import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.security.UserGroupInformation;
/** Delegation token parameter. */ /** Represents delegation token used for authentication. */
public class DelegationParam extends StringParam { public class DelegationParam extends StringParam {
/** Parameter name. */ /** Parameter name. */
public static final String NAME = "delegation"; public static final String NAME = "delegation";

View File

@ -0,0 +1,44 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hdfs.web.resources;
/**
* Represents delegation token parameter as method arguments. This is
* different from {@link DelegationParam}.
*/
public class TokenArgumentParam extends StringParam {
/** Parameter name. */
public static final String NAME = "token";
/** Default parameter value. */
public static final String DEFAULT = "";
private static final Domain DOMAIN = new Domain(NAME, null);
/**
* Constructor.
* @param str A string representation of the parameter value.
*/
public TokenArgumentParam(final String str) {
super(DOMAIN, str != null && !str.equals(DEFAULT) ? str : null);
}
@Override
public String getName() {
return NAME;
}
}

View File

@ -50,7 +50,7 @@ public class UserProvider
final Configuration conf = (Configuration) servletcontext final Configuration conf = (Configuration) servletcontext
.getAttribute(JspHelper.CURRENT_CONF); .getAttribute(JspHelper.CURRENT_CONF);
try { try {
return JspHelper.getUGI(null, request, conf, return JspHelper.getUGI(servletcontext, request, conf,
AuthenticationMethod.KERBEROS, false); AuthenticationMethod.KERBEROS, false);
} catch (IOException e) { } catch (IOException e) {
throw new RuntimeException(e); throw new RuntimeException(e);

View File

@ -148,7 +148,7 @@ public class TestDelegationToken {
@Test @Test
public void testDelegationTokenDFSApi() throws Exception { public void testDelegationTokenDFSApi() throws Exception {
DistributedFileSystem dfs = (DistributedFileSystem) cluster.getFileSystem(); DistributedFileSystem dfs = (DistributedFileSystem) cluster.getFileSystem();
Token<DelegationTokenIdentifier> token = dfs.getDelegationToken("JobTracker"); final Token<DelegationTokenIdentifier> token = dfs.getDelegationToken("JobTracker");
DelegationTokenIdentifier identifier = new DelegationTokenIdentifier(); DelegationTokenIdentifier identifier = new DelegationTokenIdentifier();
byte[] tokenId = token.getIdentifier(); byte[] tokenId = token.getIdentifier();
identifier.readFields(new DataInputStream( identifier.readFields(new DataInputStream(
@ -156,6 +156,15 @@ public class TestDelegationToken {
LOG.info("A valid token should have non-null password, and should be renewed successfully"); LOG.info("A valid token should have non-null password, and should be renewed successfully");
Assert.assertTrue(null != dtSecretManager.retrievePassword(identifier)); Assert.assertTrue(null != dtSecretManager.retrievePassword(identifier));
dtSecretManager.renewToken(token, "JobTracker"); dtSecretManager.renewToken(token, "JobTracker");
UserGroupInformation.createRemoteUser("JobTracker").doAs(
new PrivilegedExceptionAction<Object>() {
@Override
public Object run() throws Exception {
token.renew(config);
token.cancel(config);
return null;
}
});
} }
@Test @Test
@ -174,13 +183,23 @@ public class TestDelegationToken {
} }
}); });
final Token<DelegationTokenIdentifier> token = webhdfs.getDelegationToken("JobTracker"); final Token<DelegationTokenIdentifier> token = webhdfs
.getDelegationToken("JobTracker");
DelegationTokenIdentifier identifier = new DelegationTokenIdentifier(); DelegationTokenIdentifier identifier = new DelegationTokenIdentifier();
byte[] tokenId = token.getIdentifier(); byte[] tokenId = token.getIdentifier();
identifier.readFields(new DataInputStream(new ByteArrayInputStream(tokenId))); identifier
.readFields(new DataInputStream(new ByteArrayInputStream(tokenId)));
LOG.info("A valid token should have non-null password, and should be renewed successfully"); LOG.info("A valid token should have non-null password, and should be renewed successfully");
Assert.assertTrue(null != dtSecretManager.retrievePassword(identifier)); Assert.assertTrue(null != dtSecretManager.retrievePassword(identifier));
dtSecretManager.renewToken(token, "JobTracker"); dtSecretManager.renewToken(token, "JobTracker");
ugi.doAs(new PrivilegedExceptionAction<Object>() {
@Override
public Object run() throws Exception {
token.renew(config);
token.cancel(config);
return null;
}
});
} }
@SuppressWarnings("deprecation") @SuppressWarnings("deprecation")

View File

@ -0,0 +1,93 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hdfs.web;
import java.io.IOException;
import java.net.URI;
import java.net.URL;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.CommonConfigurationKeys;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hdfs.security.token.delegation.DelegationTokenIdentifier;
import org.apache.hadoop.hdfs.security.token.delegation.DelegationTokenSecretManager;
import org.apache.hadoop.hdfs.server.namenode.FSNamesystem;
import org.apache.hadoop.hdfs.web.resources.DelegationParam;
import org.apache.hadoop.hdfs.web.resources.TokenArgumentParam;
import org.apache.hadoop.hdfs.web.resources.HttpOpParam;
import org.apache.hadoop.hdfs.web.resources.PutOpParam;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.security.token.Token;
import org.junit.Assert;
import org.junit.Test;
import static org.mockito.Mockito.mock;
public class TestWebHdfsUrl {
@Test
public void testDelegationTokenInUrl() throws IOException {
Configuration conf = new Configuration();
final String uri = WebHdfsFileSystem.SCHEME + "://" + "127.0.0.1:9071";
// Turn on security
conf.set(CommonConfigurationKeys.HADOOP_SECURITY_AUTHENTICATION, "kerberos");
UserGroupInformation.setConfiguration(conf);
UserGroupInformation ugi = UserGroupInformation.getCurrentUser();
DelegationTokenIdentifier dtId = new DelegationTokenIdentifier(new Text(
ugi.getUserName()), null, null);
FSNamesystem namesystem = mock(FSNamesystem.class);
DelegationTokenSecretManager dtSecretManager = new DelegationTokenSecretManager(
86400000, 86400000, 86400000, 86400000, namesystem);
dtSecretManager.startThreads();
Token<DelegationTokenIdentifier> token = new Token<DelegationTokenIdentifier>(
dtId, dtSecretManager);
token.setService(new Text("127.0.0.1:9071"));
token.setKind(WebHdfsFileSystem.TOKEN_KIND);
ugi.addToken(token);
final WebHdfsFileSystem webhdfs = (WebHdfsFileSystem) FileSystem.get(
URI.create(uri), conf);
String tokenString = token.encodeToUrlString();
Path fsPath = new Path("/");
URL renewTokenUrl = webhdfs.toUrl(PutOpParam.Op.RENEWDELEGATIONTOKEN,
fsPath, new TokenArgumentParam(tokenString));
URL cancelTokenUrl = webhdfs.toUrl(PutOpParam.Op.CANCELDELEGATIONTOKEN,
fsPath, new TokenArgumentParam(tokenString));
Assert.assertEquals(
generateUrlQueryPrefix(PutOpParam.Op.RENEWDELEGATIONTOKEN,
ugi.getUserName())
+ "&token=" + tokenString, renewTokenUrl.getQuery());
Token<DelegationTokenIdentifier> delegationToken = new Token<DelegationTokenIdentifier>(
token);
delegationToken.setKind(DelegationTokenIdentifier.HDFS_DELEGATION_KIND);
Assert.assertEquals(
generateUrlQueryPrefix(PutOpParam.Op.CANCELDELEGATIONTOKEN,
ugi.getUserName())
+ "&token="
+ tokenString
+ "&"
+ DelegationParam.NAME
+ "="
+ delegationToken.encodeToUrlString(), cancelTokenUrl.getQuery());
}
private String generateUrlQueryPrefix(HttpOpParam.Op op, String username) {
return "op=" + op.toString() + "&user.name=" + username;
}
}