HDFS-2652. Add support for host-based delegation tokens. Contributed by Daryn Sharp

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1327309 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Tsz-wo Sze 2012-04-17 22:21:33 +00:00
parent a15e69cb9f
commit c80dbe5e09
11 changed files with 266 additions and 79 deletions

View File

@ -860,6 +860,9 @@ Release 0.23.3 - UNRELEASED
HDFS-3176. Use MD5MD5CRC32FileChecksum.readFields() in JsonUtil . (Kihwal HDFS-3176. Use MD5MD5CRC32FileChecksum.readFields() in JsonUtil . (Kihwal
Lee via szetszwo) Lee via szetszwo)
HDFS-2652. Add support for host-based delegation tokens. (Daryn Sharp via
szetszwo)
Release 0.23.2 - UNRELEASED Release 0.23.2 - UNRELEASED
INCOMPATIBLE CHANGES INCOMPATIBLE CHANGES

View File

@ -52,6 +52,9 @@ public class HAUtil {
private static final Log LOG = private static final Log LOG =
LogFactory.getLog(HAUtil.class); LogFactory.getLog(HAUtil.class);
private static final DelegationTokenSelector tokenSelector =
new DelegationTokenSelector();
private HAUtil() { /* Hidden constructor */ } private HAUtil() { /* Hidden constructor */ }
/** /**
@ -241,25 +244,28 @@ public static boolean isTokenForLogicalUri(
* one is found, clone it to also represent the underlying namenode address. * one is found, clone it to also represent the underlying namenode address.
* @param ugi the UGI to modify * @param ugi the UGI to modify
* @param haUri the logical URI for the cluster * @param haUri the logical URI for the cluster
* @param singleNNAddr one of the NNs in the cluster to which the token * @param nnAddrs collection of NNs in the cluster to which the token
* applies * applies
*/ */
public static void cloneDelegationTokenForLogicalUri( public static void cloneDelegationTokenForLogicalUri(
UserGroupInformation ugi, URI haUri, UserGroupInformation ugi, URI haUri,
InetSocketAddress singleNNAddr) { Collection<InetSocketAddress> nnAddrs) {
Text haService = buildTokenServiceForLogicalUri(haUri); Text haService = HAUtil.buildTokenServiceForLogicalUri(haUri);
Token<DelegationTokenIdentifier> haToken = Token<DelegationTokenIdentifier> haToken =
DelegationTokenSelector.selectHdfsDelegationToken(haService, ugi); tokenSelector.selectToken(haService, ugi.getTokens());
if (haToken == null) { if (haToken != null) {
// no token for (InetSocketAddress singleNNAddr : nnAddrs) {
return; Token<DelegationTokenIdentifier> specificToken =
new Token<DelegationTokenIdentifier>(haToken);
SecurityUtil.setTokenService(specificToken, singleNNAddr);
ugi.addToken(specificToken);
LOG.debug("Mapped HA service delegation token for logical URI " +
haUri + " to namenode " + singleNNAddr);
}
} else {
LOG.debug("No HA service delegation token found for logical URI " +
haUri);
} }
Token<DelegationTokenIdentifier> specificToken =
new Token<DelegationTokenIdentifier>(haToken);
specificToken.setService(SecurityUtil.buildTokenService(singleNNAddr));
ugi.addToken(specificToken);
LOG.debug("Mapped HA service delegation token for logical URI " +
haUri + " to namenode " + singleNNAddr);
} }
/** /**

View File

@ -30,6 +30,7 @@
import java.text.ParseException; import java.text.ParseException;
import java.text.SimpleDateFormat; import java.text.SimpleDateFormat;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Collection;
import java.util.TimeZone; import java.util.TimeZone;
import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceAudience;
@ -48,7 +49,6 @@
import org.apache.hadoop.hdfs.security.token.delegation.DelegationTokenRenewer; import org.apache.hadoop.hdfs.security.token.delegation.DelegationTokenRenewer;
import org.apache.hadoop.hdfs.security.token.delegation.DelegationTokenSelector; import org.apache.hadoop.hdfs.security.token.delegation.DelegationTokenSelector;
import org.apache.hadoop.hdfs.server.common.JspHelper; import org.apache.hadoop.hdfs.server.common.JspHelper;
import org.apache.hadoop.hdfs.server.namenode.NameNode;
import org.apache.hadoop.hdfs.tools.DelegationTokenFetcher; import org.apache.hadoop.hdfs.tools.DelegationTokenFetcher;
import org.apache.hadoop.hdfs.web.URLUtils; import org.apache.hadoop.hdfs.web.URLUtils;
import org.apache.hadoop.io.Text; import org.apache.hadoop.io.Text;
@ -168,10 +168,7 @@ public void initialize(final URI name, final Configuration conf)
protected void initDelegationToken() throws IOException { protected void initDelegationToken() throws IOException {
// look for hftp token, then try hdfs // look for hftp token, then try hdfs
Token<?> token = selectHftpDelegationToken(); Token<?> token = selectDelegationToken();
if (token == null) {
token = selectHdfsDelegationToken();
}
// if we don't already have a token, go get one over https // if we don't already have a token, go get one over https
boolean createdToken = false; boolean createdToken = false;
@ -192,14 +189,8 @@ protected void initDelegationToken() throws IOException {
} }
} }
protected Token<DelegationTokenIdentifier> selectHftpDelegationToken() { protected Token<DelegationTokenIdentifier> selectDelegationToken() {
Text serviceName = SecurityUtil.buildTokenService(nnSecureAddr); return hftpTokenSelector.selectToken(getUri(), ugi.getTokens(), getConf());
return hftpTokenSelector.selectToken(serviceName, ugi.getTokens());
}
protected Token<DelegationTokenIdentifier> selectHdfsDelegationToken() {
return DelegationTokenSelector.selectHdfsDelegationToken(
nnAddr, ugi, getConf());
} }
@ -699,9 +690,22 @@ public void cancel(Token<?> token,
private static class HftpDelegationTokenSelector private static class HftpDelegationTokenSelector
extends AbstractDelegationTokenSelector<DelegationTokenIdentifier> { extends AbstractDelegationTokenSelector<DelegationTokenIdentifier> {
private static final DelegationTokenSelector hdfsTokenSelector =
new DelegationTokenSelector();
public HftpDelegationTokenSelector() { public HftpDelegationTokenSelector() {
super(TOKEN_KIND); super(TOKEN_KIND);
} }
Token<DelegationTokenIdentifier> selectToken(URI nnUri,
Collection<Token<?>> tokens, Configuration conf) {
Token<DelegationTokenIdentifier> token =
selectToken(SecurityUtil.buildTokenService(nnUri), tokens);
if (token == null) {
// try to get a HDFS token
token = hdfsTokenSelector.selectToken(nnUri, tokens, conf);
}
return token;
}
} }
} }

View File

@ -17,7 +17,8 @@
*/ */
package org.apache.hadoop.hdfs.security.token.delegation; package org.apache.hadoop.hdfs.security.token.delegation;
import java.net.InetSocketAddress; import java.net.URI;
import java.util.Collection;
import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
@ -25,7 +26,6 @@
import org.apache.hadoop.io.Text; import org.apache.hadoop.io.Text;
import org.apache.hadoop.net.NetUtils; import org.apache.hadoop.net.NetUtils;
import org.apache.hadoop.security.SecurityUtil; import org.apache.hadoop.security.SecurityUtil;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.security.token.Token; import org.apache.hadoop.security.token.Token;
import org.apache.hadoop.security.token.delegation.AbstractDelegationTokenSelector; import org.apache.hadoop.security.token.delegation.AbstractDelegationTokenSelector;
@ -37,32 +37,35 @@ public class DelegationTokenSelector
extends AbstractDelegationTokenSelector<DelegationTokenIdentifier>{ extends AbstractDelegationTokenSelector<DelegationTokenIdentifier>{
public static final String SERVICE_NAME_KEY = "hdfs.service.host_"; public static final String SERVICE_NAME_KEY = "hdfs.service.host_";
private static final DelegationTokenSelector INSTANCE = new DelegationTokenSelector(); /**
* Select the delegation token for hdfs. The port will be rewritten to
/** Select the delegation token for hdfs from the ugi. */ * the port of hdfs.service.host_$nnAddr, or the default rpc namenode port.
public static Token<DelegationTokenIdentifier> selectHdfsDelegationToken( * This method should only be called by non-hdfs filesystems that do not
final InetSocketAddress nnAddr, final UserGroupInformation ugi, * use the rpc port to acquire tokens. Ex. webhdfs, hftp
* @param nnUri of the remote namenode
* @param tokens as a collection
* @param conf hadoop configuration
* @return Token
*/
public Token<DelegationTokenIdentifier> selectToken(
final URI nnUri, Collection<Token<?>> tokens,
final Configuration conf) { final Configuration conf) {
// this guesses the remote cluster's rpc service port. // this guesses the remote cluster's rpc service port.
// the current token design assumes it's the same as the local cluster's // the current token design assumes it's the same as the local cluster's
// rpc port unless a config key is set. there should be a way to automatic // rpc port unless a config key is set. there should be a way to automatic
// and correctly determine the value // and correctly determine the value
final String key = SERVICE_NAME_KEY + SecurityUtil.buildTokenService(nnAddr); Text serviceName = SecurityUtil.buildTokenService(nnUri);
final String nnServiceName = conf.get(key); final String nnServiceName = conf.get(SERVICE_NAME_KEY + serviceName);
int nnRpcPort = NameNode.DEFAULT_PORT; int nnRpcPort = NameNode.DEFAULT_PORT;
if (nnServiceName != null) { if (nnServiceName != null) {
nnRpcPort = NetUtils.createSocketAddr(nnServiceName, nnRpcPort).getPort(); nnRpcPort = NetUtils.createSocketAddr(nnServiceName, nnRpcPort).getPort();
} }
// use original hostname from the uri to avoid unintentional host resolving
serviceName = SecurityUtil.buildTokenService(
NetUtils.createSocketAddrForHost(nnUri.getHost(), nnRpcPort));
final Text serviceName = SecurityUtil.buildTokenService( return selectToken(serviceName, tokens);
new InetSocketAddress(nnAddr.getHostName(), nnRpcPort));
return INSTANCE.selectToken(serviceName, ugi.getTokens());
}
public static Token<DelegationTokenIdentifier> selectHdfsDelegationToken(
Text serviceName, UserGroupInformation ugi) {
return INSTANCE.selectToken(serviceName, ugi.getTokens());
} }
public DelegationTokenSelector() { public DelegationTokenSelector() {

View File

@ -22,6 +22,7 @@
import java.net.InetSocketAddress; import java.net.InetSocketAddress;
import java.net.URI; import java.net.URI;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Collection;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
@ -93,14 +94,15 @@ public ConfiguredFailoverProxyProvider(Configuration conf, URI uri,
"for URI " + uri); "for URI " + uri);
} }
for (InetSocketAddress address : addressesInNN.values()) { Collection<InetSocketAddress> addressesOfNns = addressesInNN.values();
for (InetSocketAddress address : addressesOfNns) {
proxies.add(new AddressRpcProxyPair<T>(address)); proxies.add(new AddressRpcProxyPair<T>(address));
// The client may have a delegation token set for the logical
// URI of the cluster. Clone this token to apply to each of the
// underlying IPC addresses so that the IPC code can find it.
HAUtil.cloneDelegationTokenForLogicalUri(ugi, uri, address);
} }
// The client may have a delegation token set for the logical
// URI of the cluster. Clone this token to apply to each of the
// underlying IPC addresses so that the IPC code can find it.
HAUtil.cloneDelegationTokenForLogicalUri(ugi, uri, addressesOfNns);
} catch (IOException e) { } catch (IOException e) {
throw new RuntimeException(e); throw new RuntimeException(e);
} }

View File

@ -29,6 +29,7 @@
import java.net.URI; import java.net.URI;
import java.net.URISyntaxException; import java.net.URISyntaxException;
import java.net.URL; import java.net.URL;
import java.util.Collection;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.StringTokenizer; import java.util.StringTokenizer;
@ -117,8 +118,8 @@ public class WebHdfsFileSystem extends FileSystem
/** Delegation token kind */ /** Delegation token kind */
public static final Text TOKEN_KIND = new Text("WEBHDFS delegation"); public static final Text TOKEN_KIND = new Text("WEBHDFS delegation");
/** Token selector */ /** Token selector */
public static final AbstractDelegationTokenSelector<DelegationTokenIdentifier> DT_SELECTOR public static final WebHdfsDelegationTokenSelector DT_SELECTOR
= new AbstractDelegationTokenSelector<DelegationTokenIdentifier>(TOKEN_KIND) {}; = new WebHdfsDelegationTokenSelector();
private static DelegationTokenRenewer<WebHdfsFileSystem> DT_RENEWER = null; private static DelegationTokenRenewer<WebHdfsFileSystem> DT_RENEWER = null;
@ -164,7 +165,7 @@ public synchronized void initialize(URI uri, Configuration conf
} catch (URISyntaxException e) { } catch (URISyntaxException e) {
throw new IllegalArgumentException(e); throw new IllegalArgumentException(e);
} }
this.nnAddr = NetUtils.createSocketAddr(uri.toString()); this.nnAddr = NetUtils.createSocketAddrForHost(uri.getHost(), uri.getPort());
this.workingDir = getHomeDirectory(); this.workingDir = getHomeDirectory();
if (UserGroupInformation.isSecurityEnabled()) { if (UserGroupInformation.isSecurityEnabled()) {
@ -174,12 +175,7 @@ public synchronized void initialize(URI uri, Configuration conf
protected void initDelegationToken() throws IOException { protected void initDelegationToken() throws IOException {
// look for webhdfs token, then try hdfs // look for webhdfs token, then try hdfs
final Text serviceName = SecurityUtil.buildTokenService(nnAddr); Token<?> token = selectDelegationToken();
Token<?> token = DT_SELECTOR.selectToken(serviceName, ugi.getTokens());
if (token == null) {
token = DelegationTokenSelector.selectHdfsDelegationToken(
nnAddr, ugi, getConf());
}
//since we don't already have a token, go get one //since we don't already have a token, go get one
boolean createdToken = false; boolean createdToken = false;
@ -200,6 +196,10 @@ protected void initDelegationToken() throws IOException {
} }
} }
protected Token<DelegationTokenIdentifier> selectDelegationToken() {
return DT_SELECTOR.selectToken(getUri(), ugi.getTokens(), getConf());
}
@Override @Override
protected int getDefaultPort() { protected int getDefaultPort() {
return getConf().getInt(DFSConfigKeys.DFS_NAMENODE_HTTP_PORT_KEY, return getConf().getInt(DFSConfigKeys.DFS_NAMENODE_HTTP_PORT_KEY,
@ -845,4 +845,24 @@ public void cancel(final Token<?> token, final Configuration conf
} }
} }
} }
private static class WebHdfsDelegationTokenSelector
extends AbstractDelegationTokenSelector<DelegationTokenIdentifier> {
private static final DelegationTokenSelector hdfsTokenSelector =
new DelegationTokenSelector();
public WebHdfsDelegationTokenSelector() {
super(TOKEN_KIND);
}
Token<DelegationTokenIdentifier> selectToken(URI nnUri,
Collection<Token<?>> tokens, Configuration conf) {
Token<DelegationTokenIdentifier> token =
selectToken(SecurityUtil.buildTokenService(nnUri), tokens);
if (token == null) {
token = hdfsTokenSelector.selectToken(nnUri, tokens, conf);
}
return token;
}
}
} }

View File

@ -94,6 +94,7 @@
import org.apache.hadoop.net.DNSToSwitchMapping; import org.apache.hadoop.net.DNSToSwitchMapping;
import org.apache.hadoop.net.NetUtils; import org.apache.hadoop.net.NetUtils;
import org.apache.hadoop.net.StaticMapping; import org.apache.hadoop.net.StaticMapping;
import org.apache.hadoop.security.SecurityUtil;
import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.security.authorize.ProxyUsers; import org.apache.hadoop.security.authorize.ProxyUsers;
import org.apache.hadoop.util.StringUtils; import org.apache.hadoop.util.StringUtils;
@ -1049,16 +1050,14 @@ public synchronized void startDataNodes(Configuration conf, int numDataNodes,
if(dn == null) if(dn == null)
throw new IOException("Cannot start DataNode in " throw new IOException("Cannot start DataNode in "
+ dnConf.get(DFS_DATANODE_DATA_DIR_KEY)); + dnConf.get(DFS_DATANODE_DATA_DIR_KEY));
//NOTE: the following is true if and only if: //since the HDFS does things based on host|ip:port, we need to add the
// hadoop.security.token.service.use_ip=true //mapping for the service to rackId
//since the HDFS does things based on IP:port, we need to add the mapping String service =
//for IP:port to rackId SecurityUtil.buildTokenService(dn.getXferAddress()).toString();
String ipAddr = dn.getXferAddress().getAddress().getHostAddress();
if (racks != null) { if (racks != null) {
int port = dn.getXferAddress().getPort(); LOG.info("Adding node with service : " + service +
LOG.info("Adding node with IP:port : " + ipAddr + ":" + port +
" to rack " + racks[i-curDatanodesNum]); " to rack " + racks[i-curDatanodesNum]);
StaticMapping.addNodeToRack(ipAddr + ":" + port, StaticMapping.addNodeToRack(service,
racks[i-curDatanodesNum]); racks[i-curDatanodesNum]);
} }
dn.runDatanodeDaemon(); dn.runDatanodeDaemon();

View File

@ -31,6 +31,7 @@
import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.hdfs.security.token.delegation.DelegationTokenIdentifier; import org.apache.hadoop.hdfs.security.token.delegation.DelegationTokenIdentifier;
import org.apache.hadoop.io.Text; import org.apache.hadoop.io.Text;
import org.apache.hadoop.security.SecurityUtilTestHelper;
import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.security.token.Token; import org.apache.hadoop.security.token.Token;
import org.apache.hadoop.security.token.TokenIdentifier; import org.apache.hadoop.security.token.TokenIdentifier;
@ -66,4 +67,59 @@ public FileSystem run() throws Exception {
renewToken.setAccessible(true); renewToken.setAccessible(true);
assertSame("wrong token", token, renewToken.get(fs)); assertSame("wrong token", token, renewToken.get(fs));
} }
@Test
public void testSelectHdfsDelegationToken() throws Exception {
SecurityUtilTestHelper.setTokenServiceUseIp(true);
Configuration conf = new Configuration();
URI hftpUri = URI.create("hftp://localhost:0");
UserGroupInformation ugi = UserGroupInformation.getCurrentUser();
Token<?> token = null;
// test fallback to hdfs token
Token<?> hdfsToken = new Token<TokenIdentifier>(
new byte[0], new byte[0],
DelegationTokenIdentifier.HDFS_DELEGATION_KIND,
new Text("127.0.0.1:8020"));
ugi.addToken(hdfsToken);
HftpFileSystem fs = (HftpFileSystem) FileSystem.get(hftpUri, conf);
token = fs.selectDelegationToken();
assertNotNull(token);
assertEquals(hdfsToken, token);
// test hftp is favored over hdfs
Token<?> hftpToken = new Token<TokenIdentifier>(
new byte[0], new byte[0],
HftpFileSystem.TOKEN_KIND, new Text("127.0.0.1:0"));
ugi.addToken(hftpToken);
token = fs.selectDelegationToken();
assertNotNull(token);
assertEquals(hftpToken, token);
// switch to using host-based tokens, no token should match
SecurityUtilTestHelper.setTokenServiceUseIp(false);
token = fs.selectDelegationToken();
assertNull(token);
// test fallback to hdfs token
hdfsToken = new Token<TokenIdentifier>(
new byte[0], new byte[0],
DelegationTokenIdentifier.HDFS_DELEGATION_KIND,
new Text("localhost:8020"));
ugi.addToken(hdfsToken);
token = fs.selectDelegationToken();
assertNotNull(token);
assertEquals(hdfsToken, token);
// test hftp is favored over hdfs
hftpToken = new Token<TokenIdentifier>(
new byte[0], new byte[0],
HftpFileSystem.TOKEN_KIND, new Text("localhost:0"));
ugi.addToken(hftpToken);
token = fs.selectDelegationToken();
assertNotNull(token);
assertEquals(hftpToken, token);
}
} }

View File

@ -41,6 +41,7 @@
import org.apache.hadoop.security.SaslInputStream; import org.apache.hadoop.security.SaslInputStream;
import org.apache.hadoop.security.SaslRpcClient; import org.apache.hadoop.security.SaslRpcClient;
import org.apache.hadoop.security.SaslRpcServer; import org.apache.hadoop.security.SaslRpcServer;
import org.apache.hadoop.security.SecurityUtil;
import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.security.token.Token; import org.apache.hadoop.security.token.Token;
import org.apache.log4j.Level; import org.apache.log4j.Level;
@ -91,10 +92,8 @@ public void testDelegationTokenRpc() throws Exception {
DelegationTokenIdentifier dtId = new DelegationTokenIdentifier(owner, owner, null); DelegationTokenIdentifier dtId = new DelegationTokenIdentifier(owner, owner, null);
Token<DelegationTokenIdentifier> token = new Token<DelegationTokenIdentifier>( Token<DelegationTokenIdentifier> token = new Token<DelegationTokenIdentifier>(
dtId, sm); dtId, sm);
Text host = new Text(addr.getAddress().getHostAddress() + ":" SecurityUtil.setTokenService(token, addr);
+ addr.getPort()); LOG.info("Service for token is " + token.getService());
token.setService(host);
LOG.info("Service IP address for token is " + host);
current.addToken(token); current.addToken(token);
current.doAs(new PrivilegedExceptionAction<Object>() { current.doAs(new PrivilegedExceptionAction<Object>() {
@Override @Override

View File

@ -26,6 +26,7 @@
import java.net.URI; import java.net.URI;
import java.security.PrivilegedExceptionAction; import java.security.PrivilegedExceptionAction;
import java.util.Collection; import java.util.Collection;
import java.util.HashSet;
import org.apache.commons.logging.Log; import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory; import org.apache.commons.logging.LogFactory;
@ -44,11 +45,13 @@
import org.apache.hadoop.hdfs.server.namenode.NameNodeAdapter; import org.apache.hadoop.hdfs.server.namenode.NameNodeAdapter;
import org.apache.hadoop.io.Text; import org.apache.hadoop.io.Text;
import org.apache.hadoop.security.SecurityUtil; import org.apache.hadoop.security.SecurityUtil;
import org.apache.hadoop.security.SecurityUtilTestHelper;
import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.security.token.Token; import org.apache.hadoop.security.token.Token;
import org.apache.hadoop.security.token.TokenIdentifier; import org.apache.hadoop.security.token.TokenIdentifier;
import org.apache.hadoop.test.GenericTestUtils; import org.apache.hadoop.test.GenericTestUtils;
import org.junit.AfterClass; import org.junit.AfterClass;
import org.junit.Before;
import org.junit.BeforeClass; import org.junit.BeforeClass;
import org.junit.Test; import org.junit.Test;
@ -100,6 +103,11 @@ public static void shutdownCluster() throws IOException {
} }
@Before
public void prepTest() {
SecurityUtilTestHelper.setTokenServiceUseIp(true);
}
@Test @Test
public void testDelegationTokenDFSApi() throws Exception { public void testDelegationTokenDFSApi() throws Exception {
Token<DelegationTokenIdentifier> token = dfs.getDelegationToken("JobTracker"); Token<DelegationTokenIdentifier> token = dfs.getDelegationToken("JobTracker");
@ -187,23 +195,48 @@ public void testHAUtilClonesDelegationTokens() throws Exception {
URI haUri = new URI("hdfs://my-ha-uri/"); URI haUri = new URI("hdfs://my-ha-uri/");
token.setService(HAUtil.buildTokenServiceForLogicalUri(haUri)); token.setService(HAUtil.buildTokenServiceForLogicalUri(haUri));
ugi.addToken(token); ugi.addToken(token);
HAUtil.cloneDelegationTokenForLogicalUri(ugi, haUri, nn0.getNameNodeAddress());
HAUtil.cloneDelegationTokenForLogicalUri(ugi, haUri, nn1.getNameNodeAddress()); Collection<InetSocketAddress> nnAddrs = new HashSet<InetSocketAddress>();
nnAddrs.add(nn0.getNameNodeAddress());
nnAddrs.add(nn1.getNameNodeAddress());
HAUtil.cloneDelegationTokenForLogicalUri(ugi, haUri, nnAddrs);
Collection<Token<? extends TokenIdentifier>> tokens = ugi.getTokens(); Collection<Token<? extends TokenIdentifier>> tokens = ugi.getTokens();
assertEquals(3, tokens.size()); assertEquals(3, tokens.size());
LOG.info("Tokens:\n" + Joiner.on("\n").join(tokens)); LOG.info("Tokens:\n" + Joiner.on("\n").join(tokens));
DelegationTokenSelector dts = new DelegationTokenSelector();
// check that the token selected for one of the physical IPC addresses // check that the token selected for one of the physical IPC addresses
// matches the one we received // matches the one we received
InetSocketAddress addr = nn0.getNameNodeAddress(); for (InetSocketAddress addr : nnAddrs) {
Text ipcDtService = SecurityUtil.buildTokenService(addr); Text ipcDtService = SecurityUtil.buildTokenService(addr);
Token<DelegationTokenIdentifier> token2 = Token<DelegationTokenIdentifier> token2 =
DelegationTokenSelector.selectHdfsDelegationToken(ipcDtService, ugi); dts.selectToken(ipcDtService, ugi.getTokens());
assertNotNull(token2); assertNotNull(token2);
assertArrayEquals(token.getIdentifier(), token2.getIdentifier()); assertArrayEquals(token.getIdentifier(), token2.getIdentifier());
assertArrayEquals(token.getPassword(), token2.getPassword()); assertArrayEquals(token.getPassword(), token2.getPassword());
}
// switch to host-based tokens, shouldn't match existing tokens
SecurityUtilTestHelper.setTokenServiceUseIp(false);
for (InetSocketAddress addr : nnAddrs) {
Text ipcDtService = SecurityUtil.buildTokenService(addr);
Token<DelegationTokenIdentifier> token2 =
dts.selectToken(ipcDtService, ugi.getTokens());
assertNull(token2);
}
// reclone the tokens, and see if they match now
HAUtil.cloneDelegationTokenForLogicalUri(ugi, haUri, nnAddrs);
for (InetSocketAddress addr : nnAddrs) {
Text ipcDtService = SecurityUtil.buildTokenService(addr);
Token<DelegationTokenIdentifier> token2 =
dts.selectToken(ipcDtService, ugi.getTokens());
assertNotNull(token2);
assertArrayEquals(token.getIdentifier(), token2.getIdentifier());
assertArrayEquals(token.getPassword(), token2.getPassword());
}
} }
/** /**

View File

@ -34,10 +34,16 @@
import org.apache.hadoop.hdfs.web.resources.HttpOpParam; import org.apache.hadoop.hdfs.web.resources.HttpOpParam;
import org.apache.hadoop.hdfs.web.resources.PutOpParam; import org.apache.hadoop.hdfs.web.resources.PutOpParam;
import org.apache.hadoop.io.Text; import org.apache.hadoop.io.Text;
import org.apache.hadoop.security.SecurityUtilTestHelper;
import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.security.token.Token; import org.apache.hadoop.security.token.Token;
import org.apache.hadoop.security.token.TokenIdentifier;
import org.junit.Assert; import org.junit.Assert;
import org.junit.Test; import org.junit.Test;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertNull;
import static org.mockito.Mockito.mock; import static org.mockito.Mockito.mock;
public class TestWebHdfsUrl { public class TestWebHdfsUrl {
@ -90,4 +96,60 @@ public void testDelegationTokenInUrl() throws IOException {
private String generateUrlQueryPrefix(HttpOpParam.Op op, String username) { private String generateUrlQueryPrefix(HttpOpParam.Op op, String username) {
return "op=" + op.toString() + "&user.name=" + username; return "op=" + op.toString() + "&user.name=" + username;
} }
@Test
public void testSelectDelegationToken() throws Exception {
SecurityUtilTestHelper.setTokenServiceUseIp(true);
Configuration conf = new Configuration();
URI webHdfsUri = URI.create("webhdfs://localhost:0");
UserGroupInformation ugi = UserGroupInformation.getCurrentUser();
Token<?> token = null;
// test fallback to hdfs token
Token<?> hdfsToken = new Token<TokenIdentifier>(
new byte[0], new byte[0],
DelegationTokenIdentifier.HDFS_DELEGATION_KIND,
new Text("127.0.0.1:8020"));
ugi.addToken(hdfsToken);
WebHdfsFileSystem fs = (WebHdfsFileSystem) FileSystem.get(webHdfsUri, conf);
token = fs.selectDelegationToken();
assertNotNull(token);
assertEquals(hdfsToken, token);
// test webhdfs is favored over hdfs
Token<?> webHdfsToken = new Token<TokenIdentifier>(
new byte[0], new byte[0],
WebHdfsFileSystem.TOKEN_KIND, new Text("127.0.0.1:0"));
ugi.addToken(webHdfsToken);
token = fs.selectDelegationToken();
assertNotNull(token);
assertEquals(webHdfsToken, token);
// switch to using host-based tokens, no token should match
SecurityUtilTestHelper.setTokenServiceUseIp(false);
token = fs.selectDelegationToken();
assertNull(token);
// test fallback to hdfs token
hdfsToken = new Token<TokenIdentifier>(
new byte[0], new byte[0],
DelegationTokenIdentifier.HDFS_DELEGATION_KIND,
new Text("localhost:8020"));
ugi.addToken(hdfsToken);
token = fs.selectDelegationToken();
assertNotNull(token);
assertEquals(hdfsToken, token);
// test webhdfs is favored over hdfs
webHdfsToken = new Token<TokenIdentifier>(
new byte[0], new byte[0],
WebHdfsFileSystem.TOKEN_KIND, new Text("localhost:0"));
ugi.addToken(webHdfsToken);
token = fs.selectDelegationToken();
assertNotNull(token);
assertEquals(webHdfsToken, token);
}
} }