HDFS-13972. RBF: Support for Delegation Token (WebHDFS). Contributed by CR Hota.

This commit is contained in:
Brahma Reddy Battula 2019-04-24 19:35:03 +05:30
parent de7da9b69e
commit 021a43b1a4
7 changed files with 346 additions and 140 deletions

View File

@ -37,6 +37,8 @@ import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hdfs.DFSUtil;
import org.apache.hadoop.hdfs.HAUtil;
import org.apache.hadoop.hdfs.security.token.delegation.DelegationTokenIdentifier;
import org.apache.hadoop.hdfs.server.common.TokenVerifier;
import org.apache.hadoop.hdfs.server.federation.metrics.FederationMetrics;
import org.apache.hadoop.hdfs.server.federation.metrics.NamenodeBeanMetrics;
import org.apache.hadoop.hdfs.server.federation.resolver.ActiveNamenodeResolver;
@ -76,7 +78,8 @@ import com.google.common.annotations.VisibleForTesting;
*/
@InterfaceAudience.Private
@InterfaceStability.Evolving
public class Router extends CompositeService {
public class Router extends CompositeService implements
TokenVerifier<DelegationTokenIdentifier> {
private static final Logger LOG = LoggerFactory.getLogger(Router.class);
@ -470,6 +473,12 @@ public class Router extends CompositeService {
return null;
}
@Override
public void verifyToken(DelegationTokenIdentifier tokenId, byte[] password)
throws IOException {
getRpcServer().getRouterSecurityManager().verifyToken(tokenId, password);
}
/////////////////////////////////////////////////////////
// Namenode heartbeat monitors
/////////////////////////////////////////////////////////

View File

@ -203,6 +203,9 @@ public class RouterRpcServer extends AbstractService
private final RouterClientProtocol clientProto;
/** Router security manager to handle token operations. */
private RouterSecurityManager securityManager = null;
/** Super user credentials that a thread may use. */
private static final ThreadLocal<UserGroupInformation> CUR_USER =
new ThreadLocal<>();
/**
* Construct a router RPC server.
@ -1514,10 +1517,25 @@ public class RouterRpcServer extends AbstractService
* @throws IOException If we cannot get the user information.
*/
public static UserGroupInformation getRemoteUser() throws IOException {
UserGroupInformation ugi = Server.getRemoteUser();
UserGroupInformation ugi = CUR_USER.get();
ugi = (ugi != null) ? ugi : Server.getRemoteUser();
return (ugi != null) ? ugi : UserGroupInformation.getCurrentUser();
}
/**
* Set super user credentials if needed.
*/
static void setCurrentUser(UserGroupInformation ugi) {
CUR_USER.set(ugi);
}
/**
* Reset to discard super user credentials.
*/
static void resetCurrentUser() {
CUR_USER.set(null);
}
/**
* Merge the outputs from multiple namespaces.
*

View File

@ -19,7 +19,6 @@ package org.apache.hadoop.hdfs.server.federation.router;
import static org.apache.hadoop.util.StringUtils.getTrimmedStringCollection;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hdfs.protocol.ClientProtocol;
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
import org.apache.hadoop.hdfs.protocol.HdfsFileStatus;
@ -27,13 +26,10 @@ import org.apache.hadoop.hdfs.protocol.LocatedBlock;
import org.apache.hadoop.hdfs.protocol.LocatedBlocks;
import org.apache.hadoop.hdfs.protocol.HdfsConstants.DatanodeReportType;
import org.apache.hadoop.hdfs.server.common.JspHelper;
import org.apache.hadoop.hdfs.server.federation.resolver.ActiveNamenodeResolver;
import org.apache.hadoop.hdfs.server.federation.resolver.FederationNamenodeContext;
import org.apache.hadoop.hdfs.server.federation.resolver.RemoteLocation;
import org.apache.hadoop.hdfs.server.federation.router.security.RouterSecurityManager;
import org.apache.hadoop.hdfs.server.namenode.web.resources.NamenodeWebHdfsMethods;
import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;
import javax.ws.rs.Path;
import javax.ws.rs.core.Context;
import javax.ws.rs.core.MediaType;
@ -42,7 +38,6 @@ import javax.ws.rs.core.Response;
import com.sun.jersey.spi.container.ResourceFilters;
import org.apache.hadoop.hdfs.web.JsonUtil;
import org.apache.hadoop.hdfs.web.ParamFilter;
import org.apache.hadoop.hdfs.web.URLConnectionFactory;
import org.apache.hadoop.hdfs.web.WebHdfsFileSystem;
import org.apache.hadoop.hdfs.web.resources.AccessTimeParam;
import org.apache.hadoop.hdfs.web.resources.AclPermissionParam;
@ -91,6 +86,7 @@ import org.apache.hadoop.hdfs.web.resources.XAttrValueParam;
import org.apache.hadoop.ipc.ExternalCall;
import org.apache.hadoop.ipc.RetriableException;
import org.apache.hadoop.net.Node;
import org.apache.hadoop.security.Credentials;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.security.token.Token;
import org.apache.hadoop.security.token.TokenIdentifier;
@ -99,12 +95,8 @@ import org.slf4j.LoggerFactory;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.net.HttpURLConnection;
import java.net.URI;
import java.net.URISyntaxException;
import java.net.URL;
import java.net.URLDecoder;
import java.security.PrivilegedAction;
import java.util.Collection;
import java.util.HashSet;
import java.util.List;
@ -224,7 +216,11 @@ public class RouterWebHdfsMethods extends NamenodeWebHdfsMethods {
case CREATE:
{
final Router router = getRouter();
final URI uri = redirectURI(router, fullpath);
final URI uri = redirectURI(router, ugi, delegation, username,
doAsUser, fullpath, op.getValue(), -1L,
exclDatanodes.getValue(), permission, unmaskedPermission,
overwrite, bufferSize, replication, blockSize, createParent,
createFlagParam);
if (!noredirectParam.getValue()) {
return Response.temporaryRedirect(uri)
.type(MediaType.APPLICATION_OCTET_STREAM).build();
@ -366,6 +362,7 @@ public class RouterWebHdfsMethods extends NamenodeWebHdfsMethods {
return Response.ok(js).type(MediaType.APPLICATION_JSON).build();
}
}
case GETDELEGATIONTOKEN:
case GET_BLOCK_LOCATIONS:
case GETFILESTATUS:
case LISTSTATUS:
@ -389,104 +386,6 @@ public class RouterWebHdfsMethods extends NamenodeWebHdfsMethods {
}
}
/**
* Get the redirect URI from the Namenode responsible for a path.
* @param router Router to check.
* @param path Path to get location for.
* @return URI returned by the Namenode.
* @throws IOException If it cannot get the redirect URI.
*/
private URI redirectURI(final Router router, final String path)
throws IOException {
// Forward the request to the proper Namenode
final HttpURLConnection conn = forwardRequest(router, path);
try {
conn.setInstanceFollowRedirects(false);
conn.setDoOutput(true);
conn.connect();
// Read the reply from the Namenode
int responseCode = conn.getResponseCode();
if (responseCode != HttpServletResponse.SC_TEMPORARY_REDIRECT) {
LOG.info("We expected a redirection from the Namenode, not {}",
responseCode);
return null;
}
// Extract the redirect location and return it
String redirectLocation = conn.getHeaderField("Location");
try {
// We modify the namenode location and the path
redirectLocation = redirectLocation
.replaceAll("(?<=[?&;])namenoderpcaddress=.*?(?=[&;])",
"namenoderpcaddress=" + router.getRouterId())
.replaceAll("(?<=[/])webhdfs/v1/.*?(?=[?])",
"webhdfs/v1" + path);
return new URI(redirectLocation);
} catch (URISyntaxException e) {
LOG.error("Cannot parse redirect location {}", redirectLocation);
}
} finally {
if (conn != null) {
conn.disconnect();
}
}
return null;
}
/**
* Forwards a request to a subcluster.
* @param router Router to check.
* @param path Path in HDFS.
* @return Reply from the subcluster.
* @throws IOException
*/
private HttpURLConnection forwardRequest(
final Router router, final String path) throws IOException {
final Configuration conf =
(Configuration)getContext().getAttribute(JspHelper.CURRENT_CONF);
URLConnectionFactory connectionFactory =
URLConnectionFactory.newDefaultURLConnectionFactory(conf);
// Find the namespace responsible for a path
final RouterRpcServer rpcServer = getRPCServer(router);
RemoteLocation createLoc = rpcServer.getCreateLocation(path);
String nsId = createLoc.getNameserviceId();
String dest = createLoc.getDest();
ActiveNamenodeResolver nnResolver = router.getNamenodeResolver();
List<? extends FederationNamenodeContext> namenodes =
nnResolver.getNamenodesForNameserviceId(nsId);
// Go over the namenodes responsible for that namespace
for (FederationNamenodeContext namenode : namenodes) {
try {
// Generate the request for the namenode
String nnWebAddress = namenode.getWebAddress();
String[] nnWebAddressSplit = nnWebAddress.split(":");
String host = nnWebAddressSplit[0];
int port = Integer.parseInt(nnWebAddressSplit[1]);
// Avoid double-encoding here
query = URLDecoder.decode(query, "UTF-8");
URI uri = new URI(getScheme(), null, host, port,
reqPath + dest, query, null);
URL url = uri.toURL();
// Send a request to the proper Namenode
final HttpURLConnection conn =
(HttpURLConnection)connectionFactory.openConnection(url);
conn.setRequestMethod(method);
connectionFactory.destroy();
return conn;
} catch (Exception e) {
LOG.error("Cannot redirect request to {}", namenode, e);
}
}
connectionFactory.destroy();
return null;
}
/**
* Get a URI to redirect an operation to.
* @param router Router to check.
@ -526,7 +425,7 @@ public class RouterWebHdfsMethods extends NamenodeWebHdfsMethods {
} else {
// generate a token
final Token<? extends TokenIdentifier> t = generateDelegationToken(
router, ugi, request.getUserPrincipal().getName());
ugi, ugi.getUserName());
delegationQuery = "&delegation=" + t.encodeToUrlString();
}
@ -552,19 +451,17 @@ public class RouterWebHdfsMethods extends NamenodeWebHdfsMethods {
// We need to get the DNs as a privileged user
final RouterRpcServer rpcServer = getRPCServer(router);
UserGroupInformation loginUser = UserGroupInformation.getLoginUser();
RouterRpcServer.setCurrentUser(loginUser);
DatanodeInfo[] dns = loginUser.doAs(
new PrivilegedAction<DatanodeInfo[]>() {
@Override
public DatanodeInfo[] run() {
try {
return rpcServer.getDatanodeReport(DatanodeReportType.LIVE);
} catch (IOException e) {
LOG.error("Cannot get the datanodes from the RPC server", e);
return null;
}
}
});
DatanodeInfo[] dns = null;
try {
dns = rpcServer.getDatanodeReport(DatanodeReportType.LIVE);
} catch (IOException e) {
LOG.error("Cannot get the datanodes from the RPC server", e);
} finally {
// Reset ugi to remote user for remaining operations.
RouterRpcServer.resetCurrentUser();
}
HashSet<Node> excludes = new HashSet<Node>();
if (excludeDatanodes != null) {
@ -646,17 +543,19 @@ public class RouterWebHdfsMethods extends NamenodeWebHdfsMethods {
}
/**
* Generate the delegation tokens for this request.
* @param router Router.
* Generate the credentials for this request.
* @param ugi User group information.
* @param renewer Who is asking for the renewal.
* @return The delegation tokens.
* @throws IOException If it cannot create the tokens.
* @return Credentials holding delegation token.
* @throws IOException If it cannot create the credentials.
*/
private Token<? extends TokenIdentifier> generateDelegationToken(
final Router router, final UserGroupInformation ugi,
@Override
public Credentials createCredentials(
final UserGroupInformation ugi,
final String renewer) throws IOException {
throw new UnsupportedOperationException("TODO Generate token for ugi=" +
ugi + " request=" + request);
final Router router = (Router)getContext().getAttribute("name.node");
final Credentials c = RouterSecurityManager.createCredentials(router, ugi,
renewer != null? renewer: ugi.getShortUserName());
return c;
}
}

View File

@ -24,8 +24,10 @@ import org.apache.hadoop.hdfs.DFSUtil;
import org.apache.hadoop.hdfs.security.token.delegation.DelegationTokenIdentifier;
import org.apache.hadoop.hdfs.server.federation.router.RBFConfigKeys;
import org.apache.hadoop.hdfs.server.federation.router.RouterRpcServer;
import org.apache.hadoop.hdfs.server.federation.router.Router;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.security.AccessControlException;
import org.apache.hadoop.security.Credentials;
import org.apache.hadoop.security.SecurityUtil;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.security.UserGroupInformation.AuthenticationMethod;
@ -36,6 +38,7 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.lang.reflect.Constructor;
/**
@ -183,6 +186,12 @@ public class RouterSecurityManager {
return token;
}
/**
* @param token token to renew
* @return new expiryTime of the token
* @throws SecretManager.InvalidToken if {@code token} is invalid
* @throws IOException on errors
*/
public long renewDelegationToken(Token<DelegationTokenIdentifier> token)
throws SecretManager.InvalidToken, IOException {
LOG.debug("Renew delegation token");
@ -211,6 +220,10 @@ public class RouterSecurityManager {
return expiryTime;
}
/**
* @param token token to cancel
* @throws IOException on error
*/
public void cancelDelegationToken(Token<DelegationTokenIdentifier> token)
throws IOException {
LOG.debug("Cancel delegation token");
@ -233,6 +246,34 @@ public class RouterSecurityManager {
}
}
/**
* A utility method for creating credentials.
* Used by web hdfs to return url encoded token.
*/
public static Credentials createCredentials(
final Router router, final UserGroupInformation ugi,
final String renewer) throws IOException {
final Token<DelegationTokenIdentifier> token =
router.getRpcServer().getDelegationToken(new Text(renewer));
if (token == null) {
return null;
}
final InetSocketAddress addr = router.getRpcServerAddress();
SecurityUtil.setTokenService(token, addr);
final Credentials c = new Credentials();
c.addToken(new Text(ugi.getShortUserName()), token);
return c;
}
/**
* Delegation token verification.
* Used by web hdfs to verify url encoded token.
*/
public void verifyToken(DelegationTokenIdentifier identifier,
byte[] password) throws SecretManager.InvalidToken {
this.dtSecretManager.verifyToken(identifier, password);
}
/**
* Log status of delegation token related operation.
* Extend in future to use audit logger instead of local logging.

View File

@ -83,9 +83,11 @@ public class TestRouterHDFSContractDelegationToken
assertTrue(identifier.getMaxDate() >= identifier.getIssueDate());
// Renew delegation token
token.renew(initSecurity());
long expiryTime = token.renew(initSecurity());
assertNotNull(token);
assertTrue(token.decodeIdentifier().getMaxDate() >= existingMaxTime);
assertEquals(existingMaxTime, token.decodeIdentifier().getMaxDate());
// Expiry time after renewal should never exceed max time of the token.
assertTrue(expiryTime <= existingMaxTime);
// Renewal should retain old master key id and sequence number
identifier = token.decodeIdentifier();
assertEquals(identifier.getMasterKeyId(), masterKeyId);

View File

@ -0,0 +1,163 @@
/*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License. See accompanying LICENSE file.
*/
package org.apache.hadoop.hdfs.server.federation.security;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertTrue;
import java.io.ByteArrayInputStream;
import java.io.DataInputStream;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.net.URI;
import java.util.Enumeration;
import java.util.Properties;
import javax.servlet.FilterConfig;
import javax.servlet.ServletException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.SWebHdfs;
import org.apache.hadoop.fs.contract.router.SecurityConfUtil;
import org.apache.hadoop.hdfs.DFSConfigKeys;
import org.apache.hadoop.hdfs.security.token.delegation.DelegationTokenIdentifier;
import org.apache.hadoop.hdfs.server.federation.RouterConfigBuilder;
import org.apache.hadoop.hdfs.server.federation.router.RBFConfigKeys;
import org.apache.hadoop.hdfs.server.federation.router.Router;
import org.apache.hadoop.hdfs.web.WebHdfsFileSystem;
import org.apache.hadoop.security.authentication.server.AuthenticationFilter;
import org.apache.hadoop.security.authentication.server.PseudoAuthenticationHandler;
import org.apache.hadoop.security.token.SecretManager.InvalidToken;
import org.apache.hadoop.security.token.Token;
import org.apache.hadoop.test.LambdaTestUtils;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
/**
* Test Delegation Tokens from the Router HTTP interface.
*/
public class TestRouterHttpDelegationToken {
private Router router;
private WebHdfsFileSystem fs;
/**
* Custom filter to be able to test auth methods and let the other ones go.
*/
public static final class NoAuthFilter extends AuthenticationFilter {
@Override
protected Properties getConfiguration(String configPrefix,
FilterConfig filterConfig) throws ServletException {
Properties props = new Properties();
Enumeration<?> names = filterConfig.getInitParameterNames();
while (names.hasMoreElements()) {
String name = (String) names.nextElement();
if (name.startsWith(configPrefix)) {
String value = filterConfig.getInitParameter(name);
props.put(name.substring(configPrefix.length()), value);
}
}
props.put(AuthenticationFilter.AUTH_TYPE, "simple");
props.put(PseudoAuthenticationHandler.ANONYMOUS_ALLOWED, "true");
return props;
}
}
@Before
public void setup() throws Exception {
Configuration conf = SecurityConfUtil.initSecurity();
conf.set(RBFConfigKeys.DFS_ROUTER_HTTP_ADDRESS_KEY, "0.0.0.0:0");
conf.set(RBFConfigKeys.DFS_ROUTER_HTTPS_ADDRESS_KEY, "0.0.0.0:0");
conf.set(RBFConfigKeys.DFS_ROUTER_RPC_ADDRESS_KEY, "0.0.0.0:0");
conf.set(DFSConfigKeys.DFS_WEBHDFS_AUTHENTICATION_FILTER_KEY,
NoAuthFilter.class.getName());
// Start routers with an RPC and HTTP service only
Configuration routerConf = new RouterConfigBuilder()
.rpc()
.http()
.build();
conf.addResource(routerConf);
router = new Router();
router.init(conf);
router.start();
InetSocketAddress webAddress = router.getHttpServerAddress();
URI webURI = new URI(SWebHdfs.SCHEME, null,
webAddress.getHostName(), webAddress.getPort(), null, null, null);
fs = (WebHdfsFileSystem)FileSystem.get(webURI, conf);
}
@After
public void cleanup() throws Exception {
if (router != null) {
router.stop();
router.close();
}
}
@Test
public void testGetDelegationToken() throws Exception {
final String renewer = "renewer0";
Token<?> token = fs.getDelegationToken(renewer);
assertNotNull(token);
DelegationTokenIdentifier tokenId =
getTokenIdentifier(token.getIdentifier());
assertEquals("router", tokenId.getOwner().toString());
assertEquals(renewer, tokenId.getRenewer().toString());
assertEquals("", tokenId.getRealUser().toString());
assertEquals("SWEBHDFS delegation", token.getKind().toString());
assertNotNull(token.getPassword());
InetSocketAddress webAddress = router.getHttpServerAddress();
assertEquals(webAddress.getHostName() + ":" + webAddress.getPort(),
token.getService().toString());
}
@Test
public void testRenewDelegationToken() throws Exception {
Token<?> token = fs.getDelegationToken("router");
DelegationTokenIdentifier tokenId =
getTokenIdentifier(token.getIdentifier());
long t = fs.renewDelegationToken(token);
assertTrue(t + " should not be larger than " + tokenId.getMaxDate(),
t <= tokenId.getMaxDate());
}
@Test
public void testCancelDelegationToken() throws Exception {
Token<?> token = fs.getDelegationToken("router");
fs.cancelDelegationToken(token);
LambdaTestUtils.intercept(InvalidToken.class,
"Renewal request for unknown token",
() -> fs.renewDelegationToken(token));
}
private DelegationTokenIdentifier getTokenIdentifier(byte[] id)
throws IOException {
DelegationTokenIdentifier identifier = new DelegationTokenIdentifier();
ByteArrayInputStream bais = new ByteArrayInputStream(id);
DataInputStream dais = new DataInputStream(bais);
identifier.readFields(dais);
return identifier;
}
}

View File

@ -18,9 +18,15 @@
package org.apache.hadoop.hdfs.server.federation.security;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.contract.router.RouterHDFSContract;
import org.apache.hadoop.hdfs.security.token.delegation.DelegationTokenIdentifier;
import org.apache.hadoop.hdfs.server.federation.RouterConfigBuilder;
import org.apache.hadoop.hdfs.server.federation.router.security.RouterSecurityManager;
import org.apache.hadoop.hdfs.server.federation.router.Router;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.security.Credentials;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.security.token.SecretManager;
import org.apache.hadoop.security.token.Token;
@ -31,7 +37,10 @@ import org.junit.Rule;
import org.junit.Test;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertEquals;
import static org.apache.hadoop.fs.contract.router.SecurityConfUtil.initSecurity;
import org.hamcrest.core.StringContains;
import java.io.IOException;
import org.slf4j.Logger;
@ -64,21 +73,19 @@ public class TestRouterSecurityManager {
@Test
public void testDelegationTokens() throws IOException {
String[] groupsForTesting = new String[1];
groupsForTesting[0] = "router_group";
UserGroupInformation.reset();
UserGroupInformation.setLoginUser(UserGroupInformation
.createUserForTesting("router", groupsForTesting));
.createUserForTesting("router", getUserGroupForTesting()));
// Get a delegation token
Token<DelegationTokenIdentifier> token =
securityManager.getDelegationToken(new Text("some_renewer"));
assertNotNull(token);
// Renew the delegation token
UserGroupInformation.setLoginUser(UserGroupInformation
.createUserForTesting("some_renewer", groupsForTesting));
.createUserForTesting("some_renewer", getUserGroupForTesting()));
long updatedExpirationTime = securityManager.renewDelegationToken(token);
assertTrue(updatedExpirationTime >= token.decodeIdentifier().getMaxDate());
assertTrue(updatedExpirationTime <= token.decodeIdentifier().getMaxDate());
// Cancel the delegation token
securityManager.cancelDelegationToken(token);
@ -90,4 +97,71 @@ public class TestRouterSecurityManager {
// This throws an exception as token has been cancelled.
securityManager.renewDelegationToken(token);
}
@Test
public void testVerifyToken() throws IOException {
UserGroupInformation.reset();
UserGroupInformation.setLoginUser(UserGroupInformation
.createUserForTesting("router", getUserGroupForTesting()));
// Get a delegation token
Token<DelegationTokenIdentifier> token =
securityManager.getDelegationToken(new Text("some_renewer"));
assertNotNull(token);
// Verify the password in delegation token
securityManager.verifyToken(token.decodeIdentifier(),
token.getPassword());
// Verify an invalid password
String exceptionCause = "password doesn't match";
exceptionRule.expect(SecretManager.InvalidToken.class);
exceptionRule.expectMessage(
StringContains.containsString(exceptionCause));
securityManager.verifyToken(token.decodeIdentifier(), new byte[10]);
}
@Test
public void testCreateCredentials() throws Exception {
Configuration conf = initSecurity();
// Start routers with only an RPC service
Configuration routerConf = new RouterConfigBuilder()
.metrics()
.rpc()
.build();
conf.addResource(routerConf);
Router router = new Router();
router.init(conf);
router.start();
UserGroupInformation ugi =
UserGroupInformation.createUserForTesting(
"router", getUserGroupForTesting());
Credentials creds = RouterSecurityManager.createCredentials(
router, ugi, "some_renewer");
for (Token token : creds.getAllTokens()) {
assertNotNull(token);
// Verify properties of the token
assertEquals("HDFS_DELEGATION_TOKEN", token.getKind().toString());
DelegationTokenIdentifier identifier = (DelegationTokenIdentifier)
token.decodeIdentifier();
assertNotNull(identifier);
String owner = identifier.getOwner().toString();
// Windows will not reverse name lookup "127.0.0.1" to "localhost".
String host = Path.WINDOWS ? "127.0.0.1" : "localhost";
String expectedOwner = "router/"+ host + "@EXAMPLE.COM";
assertEquals(expectedOwner, owner);
assertEquals("some_renewer", identifier.getRenewer().toString());
}
RouterHDFSContract.destroyCluster();
}
private static String[] getUserGroupForTesting() {
String[] groupsForTesting = {"router_group"};
return groupsForTesting;
}
}