HDFS-13972. RBF: Support for Delegation Token (WebHDFS). Contributed by CR Hota.
This commit is contained in:
parent
e7e48a4e96
commit
506d073482
|
@ -37,6 +37,8 @@ import org.apache.hadoop.classification.InterfaceStability;
|
||||||
import org.apache.hadoop.conf.Configuration;
|
import org.apache.hadoop.conf.Configuration;
|
||||||
import org.apache.hadoop.hdfs.DFSUtil;
|
import org.apache.hadoop.hdfs.DFSUtil;
|
||||||
import org.apache.hadoop.hdfs.HAUtil;
|
import org.apache.hadoop.hdfs.HAUtil;
|
||||||
|
import org.apache.hadoop.hdfs.security.token.delegation.DelegationTokenIdentifier;
|
||||||
|
import org.apache.hadoop.hdfs.server.common.TokenVerifier;
|
||||||
import org.apache.hadoop.hdfs.server.federation.metrics.FederationMetrics;
|
import org.apache.hadoop.hdfs.server.federation.metrics.FederationMetrics;
|
||||||
import org.apache.hadoop.hdfs.server.federation.metrics.NamenodeBeanMetrics;
|
import org.apache.hadoop.hdfs.server.federation.metrics.NamenodeBeanMetrics;
|
||||||
import org.apache.hadoop.hdfs.server.federation.resolver.ActiveNamenodeResolver;
|
import org.apache.hadoop.hdfs.server.federation.resolver.ActiveNamenodeResolver;
|
||||||
|
@ -76,7 +78,8 @@ import com.google.common.annotations.VisibleForTesting;
|
||||||
*/
|
*/
|
||||||
@InterfaceAudience.Private
|
@InterfaceAudience.Private
|
||||||
@InterfaceStability.Evolving
|
@InterfaceStability.Evolving
|
||||||
public class Router extends CompositeService {
|
public class Router extends CompositeService implements
|
||||||
|
TokenVerifier<DelegationTokenIdentifier> {
|
||||||
|
|
||||||
private static final Logger LOG = LoggerFactory.getLogger(Router.class);
|
private static final Logger LOG = LoggerFactory.getLogger(Router.class);
|
||||||
|
|
||||||
|
@ -470,6 +473,12 @@ public class Router extends CompositeService {
|
||||||
return null;
|
return null;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void verifyToken(DelegationTokenIdentifier tokenId, byte[] password)
|
||||||
|
throws IOException {
|
||||||
|
getRpcServer().getRouterSecurityManager().verifyToken(tokenId, password);
|
||||||
|
}
|
||||||
|
|
||||||
/////////////////////////////////////////////////////////
|
/////////////////////////////////////////////////////////
|
||||||
// Namenode heartbeat monitors
|
// Namenode heartbeat monitors
|
||||||
/////////////////////////////////////////////////////////
|
/////////////////////////////////////////////////////////
|
||||||
|
|
|
@ -203,6 +203,9 @@ public class RouterRpcServer extends AbstractService
|
||||||
private final RouterClientProtocol clientProto;
|
private final RouterClientProtocol clientProto;
|
||||||
/** Router security manager to handle token operations. */
|
/** Router security manager to handle token operations. */
|
||||||
private RouterSecurityManager securityManager = null;
|
private RouterSecurityManager securityManager = null;
|
||||||
|
/** Super user credentials that a thread may use. */
|
||||||
|
private static final ThreadLocal<UserGroupInformation> CUR_USER =
|
||||||
|
new ThreadLocal<>();
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Construct a router RPC server.
|
* Construct a router RPC server.
|
||||||
|
@ -1514,10 +1517,25 @@ public class RouterRpcServer extends AbstractService
|
||||||
* @throws IOException If we cannot get the user information.
|
* @throws IOException If we cannot get the user information.
|
||||||
*/
|
*/
|
||||||
public static UserGroupInformation getRemoteUser() throws IOException {
|
public static UserGroupInformation getRemoteUser() throws IOException {
|
||||||
UserGroupInformation ugi = Server.getRemoteUser();
|
UserGroupInformation ugi = CUR_USER.get();
|
||||||
|
ugi = (ugi != null) ? ugi : Server.getRemoteUser();
|
||||||
return (ugi != null) ? ugi : UserGroupInformation.getCurrentUser();
|
return (ugi != null) ? ugi : UserGroupInformation.getCurrentUser();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Set super user credentials if needed.
|
||||||
|
*/
|
||||||
|
static void setCurrentUser(UserGroupInformation ugi) {
|
||||||
|
CUR_USER.set(ugi);
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Reset to discard super user credentials.
|
||||||
|
*/
|
||||||
|
static void resetCurrentUser() {
|
||||||
|
CUR_USER.set(null);
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Merge the outputs from multiple namespaces.
|
* Merge the outputs from multiple namespaces.
|
||||||
*
|
*
|
||||||
|
|
|
@ -19,7 +19,6 @@ package org.apache.hadoop.hdfs.server.federation.router;
|
||||||
|
|
||||||
import static org.apache.hadoop.util.StringUtils.getTrimmedStringCollection;
|
import static org.apache.hadoop.util.StringUtils.getTrimmedStringCollection;
|
||||||
|
|
||||||
import org.apache.hadoop.conf.Configuration;
|
|
||||||
import org.apache.hadoop.hdfs.protocol.ClientProtocol;
|
import org.apache.hadoop.hdfs.protocol.ClientProtocol;
|
||||||
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
|
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
|
||||||
import org.apache.hadoop.hdfs.protocol.HdfsFileStatus;
|
import org.apache.hadoop.hdfs.protocol.HdfsFileStatus;
|
||||||
|
@ -27,13 +26,10 @@ import org.apache.hadoop.hdfs.protocol.LocatedBlock;
|
||||||
import org.apache.hadoop.hdfs.protocol.LocatedBlocks;
|
import org.apache.hadoop.hdfs.protocol.LocatedBlocks;
|
||||||
import org.apache.hadoop.hdfs.protocol.HdfsConstants.DatanodeReportType;
|
import org.apache.hadoop.hdfs.protocol.HdfsConstants.DatanodeReportType;
|
||||||
import org.apache.hadoop.hdfs.server.common.JspHelper;
|
import org.apache.hadoop.hdfs.server.common.JspHelper;
|
||||||
import org.apache.hadoop.hdfs.server.federation.resolver.ActiveNamenodeResolver;
|
import org.apache.hadoop.hdfs.server.federation.router.security.RouterSecurityManager;
|
||||||
import org.apache.hadoop.hdfs.server.federation.resolver.FederationNamenodeContext;
|
|
||||||
import org.apache.hadoop.hdfs.server.federation.resolver.RemoteLocation;
|
|
||||||
import org.apache.hadoop.hdfs.server.namenode.web.resources.NamenodeWebHdfsMethods;
|
import org.apache.hadoop.hdfs.server.namenode.web.resources.NamenodeWebHdfsMethods;
|
||||||
|
|
||||||
import javax.servlet.http.HttpServletRequest;
|
import javax.servlet.http.HttpServletRequest;
|
||||||
import javax.servlet.http.HttpServletResponse;
|
|
||||||
import javax.ws.rs.Path;
|
import javax.ws.rs.Path;
|
||||||
import javax.ws.rs.core.Context;
|
import javax.ws.rs.core.Context;
|
||||||
import javax.ws.rs.core.MediaType;
|
import javax.ws.rs.core.MediaType;
|
||||||
|
@ -42,7 +38,6 @@ import javax.ws.rs.core.Response;
|
||||||
import com.sun.jersey.spi.container.ResourceFilters;
|
import com.sun.jersey.spi.container.ResourceFilters;
|
||||||
import org.apache.hadoop.hdfs.web.JsonUtil;
|
import org.apache.hadoop.hdfs.web.JsonUtil;
|
||||||
import org.apache.hadoop.hdfs.web.ParamFilter;
|
import org.apache.hadoop.hdfs.web.ParamFilter;
|
||||||
import org.apache.hadoop.hdfs.web.URLConnectionFactory;
|
|
||||||
import org.apache.hadoop.hdfs.web.WebHdfsFileSystem;
|
import org.apache.hadoop.hdfs.web.WebHdfsFileSystem;
|
||||||
import org.apache.hadoop.hdfs.web.resources.AccessTimeParam;
|
import org.apache.hadoop.hdfs.web.resources.AccessTimeParam;
|
||||||
import org.apache.hadoop.hdfs.web.resources.AclPermissionParam;
|
import org.apache.hadoop.hdfs.web.resources.AclPermissionParam;
|
||||||
|
@ -91,6 +86,7 @@ import org.apache.hadoop.hdfs.web.resources.XAttrValueParam;
|
||||||
import org.apache.hadoop.ipc.ExternalCall;
|
import org.apache.hadoop.ipc.ExternalCall;
|
||||||
import org.apache.hadoop.ipc.RetriableException;
|
import org.apache.hadoop.ipc.RetriableException;
|
||||||
import org.apache.hadoop.net.Node;
|
import org.apache.hadoop.net.Node;
|
||||||
|
import org.apache.hadoop.security.Credentials;
|
||||||
import org.apache.hadoop.security.UserGroupInformation;
|
import org.apache.hadoop.security.UserGroupInformation;
|
||||||
import org.apache.hadoop.security.token.Token;
|
import org.apache.hadoop.security.token.Token;
|
||||||
import org.apache.hadoop.security.token.TokenIdentifier;
|
import org.apache.hadoop.security.token.TokenIdentifier;
|
||||||
|
@ -99,12 +95,8 @@ import org.slf4j.LoggerFactory;
|
||||||
|
|
||||||
import java.io.FileNotFoundException;
|
import java.io.FileNotFoundException;
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
import java.net.HttpURLConnection;
|
|
||||||
import java.net.URI;
|
import java.net.URI;
|
||||||
import java.net.URISyntaxException;
|
import java.net.URISyntaxException;
|
||||||
import java.net.URL;
|
|
||||||
import java.net.URLDecoder;
|
|
||||||
import java.security.PrivilegedAction;
|
|
||||||
import java.util.Collection;
|
import java.util.Collection;
|
||||||
import java.util.HashSet;
|
import java.util.HashSet;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
|
@ -224,7 +216,11 @@ public class RouterWebHdfsMethods extends NamenodeWebHdfsMethods {
|
||||||
case CREATE:
|
case CREATE:
|
||||||
{
|
{
|
||||||
final Router router = getRouter();
|
final Router router = getRouter();
|
||||||
final URI uri = redirectURI(router, fullpath);
|
final URI uri = redirectURI(router, ugi, delegation, username,
|
||||||
|
doAsUser, fullpath, op.getValue(), -1L,
|
||||||
|
exclDatanodes.getValue(), permission, unmaskedPermission,
|
||||||
|
overwrite, bufferSize, replication, blockSize, createParent,
|
||||||
|
createFlagParam);
|
||||||
if (!noredirectParam.getValue()) {
|
if (!noredirectParam.getValue()) {
|
||||||
return Response.temporaryRedirect(uri)
|
return Response.temporaryRedirect(uri)
|
||||||
.type(MediaType.APPLICATION_OCTET_STREAM).build();
|
.type(MediaType.APPLICATION_OCTET_STREAM).build();
|
||||||
|
@ -366,6 +362,7 @@ public class RouterWebHdfsMethods extends NamenodeWebHdfsMethods {
|
||||||
return Response.ok(js).type(MediaType.APPLICATION_JSON).build();
|
return Response.ok(js).type(MediaType.APPLICATION_JSON).build();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
case GETDELEGATIONTOKEN:
|
||||||
case GET_BLOCK_LOCATIONS:
|
case GET_BLOCK_LOCATIONS:
|
||||||
case GETFILESTATUS:
|
case GETFILESTATUS:
|
||||||
case LISTSTATUS:
|
case LISTSTATUS:
|
||||||
|
@ -389,104 +386,6 @@ public class RouterWebHdfsMethods extends NamenodeWebHdfsMethods {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
|
||||||
* Get the redirect URI from the Namenode responsible for a path.
|
|
||||||
* @param router Router to check.
|
|
||||||
* @param path Path to get location for.
|
|
||||||
* @return URI returned by the Namenode.
|
|
||||||
* @throws IOException If it cannot get the redirect URI.
|
|
||||||
*/
|
|
||||||
private URI redirectURI(final Router router, final String path)
|
|
||||||
throws IOException {
|
|
||||||
// Forward the request to the proper Namenode
|
|
||||||
final HttpURLConnection conn = forwardRequest(router, path);
|
|
||||||
try {
|
|
||||||
conn.setInstanceFollowRedirects(false);
|
|
||||||
conn.setDoOutput(true);
|
|
||||||
conn.connect();
|
|
||||||
|
|
||||||
// Read the reply from the Namenode
|
|
||||||
int responseCode = conn.getResponseCode();
|
|
||||||
if (responseCode != HttpServletResponse.SC_TEMPORARY_REDIRECT) {
|
|
||||||
LOG.info("We expected a redirection from the Namenode, not {}",
|
|
||||||
responseCode);
|
|
||||||
return null;
|
|
||||||
}
|
|
||||||
|
|
||||||
// Extract the redirect location and return it
|
|
||||||
String redirectLocation = conn.getHeaderField("Location");
|
|
||||||
try {
|
|
||||||
// We modify the namenode location and the path
|
|
||||||
redirectLocation = redirectLocation
|
|
||||||
.replaceAll("(?<=[?&;])namenoderpcaddress=.*?(?=[&;])",
|
|
||||||
"namenoderpcaddress=" + router.getRouterId())
|
|
||||||
.replaceAll("(?<=[/])webhdfs/v1/.*?(?=[?])",
|
|
||||||
"webhdfs/v1" + path);
|
|
||||||
return new URI(redirectLocation);
|
|
||||||
} catch (URISyntaxException e) {
|
|
||||||
LOG.error("Cannot parse redirect location {}", redirectLocation);
|
|
||||||
}
|
|
||||||
} finally {
|
|
||||||
if (conn != null) {
|
|
||||||
conn.disconnect();
|
|
||||||
}
|
|
||||||
}
|
|
||||||
return null;
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Forwards a request to a subcluster.
|
|
||||||
* @param router Router to check.
|
|
||||||
* @param path Path in HDFS.
|
|
||||||
* @return Reply from the subcluster.
|
|
||||||
* @throws IOException
|
|
||||||
*/
|
|
||||||
private HttpURLConnection forwardRequest(
|
|
||||||
final Router router, final String path) throws IOException {
|
|
||||||
final Configuration conf =
|
|
||||||
(Configuration)getContext().getAttribute(JspHelper.CURRENT_CONF);
|
|
||||||
URLConnectionFactory connectionFactory =
|
|
||||||
URLConnectionFactory.newDefaultURLConnectionFactory(conf);
|
|
||||||
|
|
||||||
// Find the namespace responsible for a path
|
|
||||||
final RouterRpcServer rpcServer = getRPCServer(router);
|
|
||||||
RemoteLocation createLoc = rpcServer.getCreateLocation(path);
|
|
||||||
String nsId = createLoc.getNameserviceId();
|
|
||||||
String dest = createLoc.getDest();
|
|
||||||
ActiveNamenodeResolver nnResolver = router.getNamenodeResolver();
|
|
||||||
List<? extends FederationNamenodeContext> namenodes =
|
|
||||||
nnResolver.getNamenodesForNameserviceId(nsId);
|
|
||||||
|
|
||||||
// Go over the namenodes responsible for that namespace
|
|
||||||
for (FederationNamenodeContext namenode : namenodes) {
|
|
||||||
try {
|
|
||||||
// Generate the request for the namenode
|
|
||||||
String nnWebAddress = namenode.getWebAddress();
|
|
||||||
String[] nnWebAddressSplit = nnWebAddress.split(":");
|
|
||||||
String host = nnWebAddressSplit[0];
|
|
||||||
int port = Integer.parseInt(nnWebAddressSplit[1]);
|
|
||||||
|
|
||||||
// Avoid double-encoding here
|
|
||||||
query = URLDecoder.decode(query, "UTF-8");
|
|
||||||
URI uri = new URI(getScheme(), null, host, port,
|
|
||||||
reqPath + dest, query, null);
|
|
||||||
URL url = uri.toURL();
|
|
||||||
|
|
||||||
// Send a request to the proper Namenode
|
|
||||||
final HttpURLConnection conn =
|
|
||||||
(HttpURLConnection)connectionFactory.openConnection(url);
|
|
||||||
conn.setRequestMethod(method);
|
|
||||||
|
|
||||||
connectionFactory.destroy();
|
|
||||||
return conn;
|
|
||||||
} catch (Exception e) {
|
|
||||||
LOG.error("Cannot redirect request to {}", namenode, e);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
connectionFactory.destroy();
|
|
||||||
return null;
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Get a URI to redirect an operation to.
|
* Get a URI to redirect an operation to.
|
||||||
* @param router Router to check.
|
* @param router Router to check.
|
||||||
|
@ -526,7 +425,7 @@ public class RouterWebHdfsMethods extends NamenodeWebHdfsMethods {
|
||||||
} else {
|
} else {
|
||||||
// generate a token
|
// generate a token
|
||||||
final Token<? extends TokenIdentifier> t = generateDelegationToken(
|
final Token<? extends TokenIdentifier> t = generateDelegationToken(
|
||||||
router, ugi, request.getUserPrincipal().getName());
|
ugi, ugi.getUserName());
|
||||||
delegationQuery = "&delegation=" + t.encodeToUrlString();
|
delegationQuery = "&delegation=" + t.encodeToUrlString();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -552,19 +451,17 @@ public class RouterWebHdfsMethods extends NamenodeWebHdfsMethods {
|
||||||
// We need to get the DNs as a privileged user
|
// We need to get the DNs as a privileged user
|
||||||
final RouterRpcServer rpcServer = getRPCServer(router);
|
final RouterRpcServer rpcServer = getRPCServer(router);
|
||||||
UserGroupInformation loginUser = UserGroupInformation.getLoginUser();
|
UserGroupInformation loginUser = UserGroupInformation.getLoginUser();
|
||||||
|
RouterRpcServer.setCurrentUser(loginUser);
|
||||||
|
|
||||||
DatanodeInfo[] dns = loginUser.doAs(
|
DatanodeInfo[] dns = null;
|
||||||
new PrivilegedAction<DatanodeInfo[]>() {
|
try {
|
||||||
@Override
|
dns = rpcServer.getDatanodeReport(DatanodeReportType.LIVE);
|
||||||
public DatanodeInfo[] run() {
|
} catch (IOException e) {
|
||||||
try {
|
LOG.error("Cannot get the datanodes from the RPC server", e);
|
||||||
return rpcServer.getDatanodeReport(DatanodeReportType.LIVE);
|
} finally {
|
||||||
} catch (IOException e) {
|
// Reset ugi to remote user for remaining operations.
|
||||||
LOG.error("Cannot get the datanodes from the RPC server", e);
|
RouterRpcServer.resetCurrentUser();
|
||||||
return null;
|
}
|
||||||
}
|
|
||||||
}
|
|
||||||
});
|
|
||||||
|
|
||||||
HashSet<Node> excludes = new HashSet<Node>();
|
HashSet<Node> excludes = new HashSet<Node>();
|
||||||
if (excludeDatanodes != null) {
|
if (excludeDatanodes != null) {
|
||||||
|
@ -646,17 +543,19 @@ public class RouterWebHdfsMethods extends NamenodeWebHdfsMethods {
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Generate the delegation tokens for this request.
|
* Generate the credentials for this request.
|
||||||
* @param router Router.
|
|
||||||
* @param ugi User group information.
|
* @param ugi User group information.
|
||||||
* @param renewer Who is asking for the renewal.
|
* @param renewer Who is asking for the renewal.
|
||||||
* @return The delegation tokens.
|
* @return Credentials holding delegation token.
|
||||||
* @throws IOException If it cannot create the tokens.
|
* @throws IOException If it cannot create the credentials.
|
||||||
*/
|
*/
|
||||||
private Token<? extends TokenIdentifier> generateDelegationToken(
|
@Override
|
||||||
final Router router, final UserGroupInformation ugi,
|
public Credentials createCredentials(
|
||||||
|
final UserGroupInformation ugi,
|
||||||
final String renewer) throws IOException {
|
final String renewer) throws IOException {
|
||||||
throw new UnsupportedOperationException("TODO Generate token for ugi=" +
|
final Router router = (Router)getContext().getAttribute("name.node");
|
||||||
ugi + " request=" + request);
|
final Credentials c = RouterSecurityManager.createCredentials(router, ugi,
|
||||||
|
renewer != null? renewer: ugi.getShortUserName());
|
||||||
|
return c;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -24,8 +24,10 @@ import org.apache.hadoop.hdfs.DFSUtil;
|
||||||
import org.apache.hadoop.hdfs.security.token.delegation.DelegationTokenIdentifier;
|
import org.apache.hadoop.hdfs.security.token.delegation.DelegationTokenIdentifier;
|
||||||
import org.apache.hadoop.hdfs.server.federation.router.RBFConfigKeys;
|
import org.apache.hadoop.hdfs.server.federation.router.RBFConfigKeys;
|
||||||
import org.apache.hadoop.hdfs.server.federation.router.RouterRpcServer;
|
import org.apache.hadoop.hdfs.server.federation.router.RouterRpcServer;
|
||||||
|
import org.apache.hadoop.hdfs.server.federation.router.Router;
|
||||||
import org.apache.hadoop.io.Text;
|
import org.apache.hadoop.io.Text;
|
||||||
import org.apache.hadoop.security.AccessControlException;
|
import org.apache.hadoop.security.AccessControlException;
|
||||||
|
import org.apache.hadoop.security.Credentials;
|
||||||
import org.apache.hadoop.security.SecurityUtil;
|
import org.apache.hadoop.security.SecurityUtil;
|
||||||
import org.apache.hadoop.security.UserGroupInformation;
|
import org.apache.hadoop.security.UserGroupInformation;
|
||||||
import org.apache.hadoop.security.UserGroupInformation.AuthenticationMethod;
|
import org.apache.hadoop.security.UserGroupInformation.AuthenticationMethod;
|
||||||
|
@ -36,6 +38,7 @@ import org.slf4j.Logger;
|
||||||
import org.slf4j.LoggerFactory;
|
import org.slf4j.LoggerFactory;
|
||||||
|
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
|
import java.net.InetSocketAddress;
|
||||||
import java.lang.reflect.Constructor;
|
import java.lang.reflect.Constructor;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -183,6 +186,12 @@ public class RouterSecurityManager {
|
||||||
return token;
|
return token;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* @param token token to renew
|
||||||
|
* @return new expiryTime of the token
|
||||||
|
* @throws SecretManager.InvalidToken if {@code token} is invalid
|
||||||
|
* @throws IOException on errors
|
||||||
|
*/
|
||||||
public long renewDelegationToken(Token<DelegationTokenIdentifier> token)
|
public long renewDelegationToken(Token<DelegationTokenIdentifier> token)
|
||||||
throws SecretManager.InvalidToken, IOException {
|
throws SecretManager.InvalidToken, IOException {
|
||||||
LOG.debug("Renew delegation token");
|
LOG.debug("Renew delegation token");
|
||||||
|
@ -211,6 +220,10 @@ public class RouterSecurityManager {
|
||||||
return expiryTime;
|
return expiryTime;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* @param token token to cancel
|
||||||
|
* @throws IOException on error
|
||||||
|
*/
|
||||||
public void cancelDelegationToken(Token<DelegationTokenIdentifier> token)
|
public void cancelDelegationToken(Token<DelegationTokenIdentifier> token)
|
||||||
throws IOException {
|
throws IOException {
|
||||||
LOG.debug("Cancel delegation token");
|
LOG.debug("Cancel delegation token");
|
||||||
|
@ -233,6 +246,34 @@ public class RouterSecurityManager {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* A utility method for creating credentials.
|
||||||
|
* Used by web hdfs to return url encoded token.
|
||||||
|
*/
|
||||||
|
public static Credentials createCredentials(
|
||||||
|
final Router router, final UserGroupInformation ugi,
|
||||||
|
final String renewer) throws IOException {
|
||||||
|
final Token<DelegationTokenIdentifier> token =
|
||||||
|
router.getRpcServer().getDelegationToken(new Text(renewer));
|
||||||
|
if (token == null) {
|
||||||
|
return null;
|
||||||
|
}
|
||||||
|
final InetSocketAddress addr = router.getRpcServerAddress();
|
||||||
|
SecurityUtil.setTokenService(token, addr);
|
||||||
|
final Credentials c = new Credentials();
|
||||||
|
c.addToken(new Text(ugi.getShortUserName()), token);
|
||||||
|
return c;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Delegation token verification.
|
||||||
|
* Used by web hdfs to verify url encoded token.
|
||||||
|
*/
|
||||||
|
public void verifyToken(DelegationTokenIdentifier identifier,
|
||||||
|
byte[] password) throws SecretManager.InvalidToken {
|
||||||
|
this.dtSecretManager.verifyToken(identifier, password);
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Log status of delegation token related operation.
|
* Log status of delegation token related operation.
|
||||||
* Extend in future to use audit logger instead of local logging.
|
* Extend in future to use audit logger instead of local logging.
|
||||||
|
|
|
@ -83,9 +83,11 @@ public class TestRouterHDFSContractDelegationToken
|
||||||
assertTrue(identifier.getMaxDate() >= identifier.getIssueDate());
|
assertTrue(identifier.getMaxDate() >= identifier.getIssueDate());
|
||||||
|
|
||||||
// Renew delegation token
|
// Renew delegation token
|
||||||
token.renew(initSecurity());
|
long expiryTime = token.renew(initSecurity());
|
||||||
assertNotNull(token);
|
assertNotNull(token);
|
||||||
assertTrue(token.decodeIdentifier().getMaxDate() >= existingMaxTime);
|
assertEquals(existingMaxTime, token.decodeIdentifier().getMaxDate());
|
||||||
|
// Expiry time after renewal should never exceed max time of the token.
|
||||||
|
assertTrue(expiryTime <= existingMaxTime);
|
||||||
// Renewal should retain old master key id and sequence number
|
// Renewal should retain old master key id and sequence number
|
||||||
identifier = token.decodeIdentifier();
|
identifier = token.decodeIdentifier();
|
||||||
assertEquals(identifier.getMasterKeyId(), masterKeyId);
|
assertEquals(identifier.getMasterKeyId(), masterKeyId);
|
||||||
|
|
|
@ -0,0 +1,163 @@
|
||||||
|
/*
|
||||||
|
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||||
|
* you may not use this file except in compliance with the License.
|
||||||
|
* You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License. See accompanying LICENSE file.
|
||||||
|
*/
|
||||||
|
|
||||||
|
package org.apache.hadoop.hdfs.server.federation.security;
|
||||||
|
|
||||||
|
import static org.junit.Assert.assertEquals;
|
||||||
|
import static org.junit.Assert.assertNotNull;
|
||||||
|
import static org.junit.Assert.assertTrue;
|
||||||
|
|
||||||
|
import java.io.ByteArrayInputStream;
|
||||||
|
import java.io.DataInputStream;
|
||||||
|
import java.io.IOException;
|
||||||
|
import java.net.InetSocketAddress;
|
||||||
|
import java.net.URI;
|
||||||
|
import java.util.Enumeration;
|
||||||
|
import java.util.Properties;
|
||||||
|
|
||||||
|
import javax.servlet.FilterConfig;
|
||||||
|
import javax.servlet.ServletException;
|
||||||
|
|
||||||
|
import org.apache.hadoop.conf.Configuration;
|
||||||
|
import org.apache.hadoop.fs.FileSystem;
|
||||||
|
import org.apache.hadoop.fs.SWebHdfs;
|
||||||
|
import org.apache.hadoop.fs.contract.router.SecurityConfUtil;
|
||||||
|
import org.apache.hadoop.hdfs.DFSConfigKeys;
|
||||||
|
import org.apache.hadoop.hdfs.security.token.delegation.DelegationTokenIdentifier;
|
||||||
|
import org.apache.hadoop.hdfs.server.federation.RouterConfigBuilder;
|
||||||
|
import org.apache.hadoop.hdfs.server.federation.router.RBFConfigKeys;
|
||||||
|
import org.apache.hadoop.hdfs.server.federation.router.Router;
|
||||||
|
import org.apache.hadoop.hdfs.web.WebHdfsFileSystem;
|
||||||
|
import org.apache.hadoop.security.authentication.server.AuthenticationFilter;
|
||||||
|
import org.apache.hadoop.security.authentication.server.PseudoAuthenticationHandler;
|
||||||
|
import org.apache.hadoop.security.token.SecretManager.InvalidToken;
|
||||||
|
import org.apache.hadoop.security.token.Token;
|
||||||
|
import org.apache.hadoop.test.LambdaTestUtils;
|
||||||
|
import org.junit.After;
|
||||||
|
import org.junit.Before;
|
||||||
|
import org.junit.Test;
|
||||||
|
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Test Delegation Tokens from the Router HTTP interface.
|
||||||
|
*/
|
||||||
|
public class TestRouterHttpDelegationToken {
|
||||||
|
|
||||||
|
private Router router;
|
||||||
|
private WebHdfsFileSystem fs;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Custom filter to be able to test auth methods and let the other ones go.
|
||||||
|
*/
|
||||||
|
public static final class NoAuthFilter extends AuthenticationFilter {
|
||||||
|
@Override
|
||||||
|
protected Properties getConfiguration(String configPrefix,
|
||||||
|
FilterConfig filterConfig) throws ServletException {
|
||||||
|
Properties props = new Properties();
|
||||||
|
Enumeration<?> names = filterConfig.getInitParameterNames();
|
||||||
|
while (names.hasMoreElements()) {
|
||||||
|
String name = (String) names.nextElement();
|
||||||
|
if (name.startsWith(configPrefix)) {
|
||||||
|
String value = filterConfig.getInitParameter(name);
|
||||||
|
props.put(name.substring(configPrefix.length()), value);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
props.put(AuthenticationFilter.AUTH_TYPE, "simple");
|
||||||
|
props.put(PseudoAuthenticationHandler.ANONYMOUS_ALLOWED, "true");
|
||||||
|
return props;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
@Before
|
||||||
|
public void setup() throws Exception {
|
||||||
|
Configuration conf = SecurityConfUtil.initSecurity();
|
||||||
|
conf.set(RBFConfigKeys.DFS_ROUTER_HTTP_ADDRESS_KEY, "0.0.0.0:0");
|
||||||
|
conf.set(RBFConfigKeys.DFS_ROUTER_HTTPS_ADDRESS_KEY, "0.0.0.0:0");
|
||||||
|
conf.set(RBFConfigKeys.DFS_ROUTER_RPC_ADDRESS_KEY, "0.0.0.0:0");
|
||||||
|
conf.set(DFSConfigKeys.DFS_WEBHDFS_AUTHENTICATION_FILTER_KEY,
|
||||||
|
NoAuthFilter.class.getName());
|
||||||
|
|
||||||
|
// Start routers with an RPC and HTTP service only
|
||||||
|
Configuration routerConf = new RouterConfigBuilder()
|
||||||
|
.rpc()
|
||||||
|
.http()
|
||||||
|
.build();
|
||||||
|
|
||||||
|
conf.addResource(routerConf);
|
||||||
|
router = new Router();
|
||||||
|
router.init(conf);
|
||||||
|
router.start();
|
||||||
|
|
||||||
|
InetSocketAddress webAddress = router.getHttpServerAddress();
|
||||||
|
URI webURI = new URI(SWebHdfs.SCHEME, null,
|
||||||
|
webAddress.getHostName(), webAddress.getPort(), null, null, null);
|
||||||
|
fs = (WebHdfsFileSystem)FileSystem.get(webURI, conf);
|
||||||
|
}
|
||||||
|
|
||||||
|
@After
|
||||||
|
public void cleanup() throws Exception {
|
||||||
|
if (router != null) {
|
||||||
|
router.stop();
|
||||||
|
router.close();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testGetDelegationToken() throws Exception {
|
||||||
|
final String renewer = "renewer0";
|
||||||
|
Token<?> token = fs.getDelegationToken(renewer);
|
||||||
|
assertNotNull(token);
|
||||||
|
|
||||||
|
DelegationTokenIdentifier tokenId =
|
||||||
|
getTokenIdentifier(token.getIdentifier());
|
||||||
|
assertEquals("router", tokenId.getOwner().toString());
|
||||||
|
assertEquals(renewer, tokenId.getRenewer().toString());
|
||||||
|
assertEquals("", tokenId.getRealUser().toString());
|
||||||
|
assertEquals("SWEBHDFS delegation", token.getKind().toString());
|
||||||
|
assertNotNull(token.getPassword());
|
||||||
|
|
||||||
|
InetSocketAddress webAddress = router.getHttpServerAddress();
|
||||||
|
assertEquals(webAddress.getHostName() + ":" + webAddress.getPort(),
|
||||||
|
token.getService().toString());
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testRenewDelegationToken() throws Exception {
|
||||||
|
Token<?> token = fs.getDelegationToken("router");
|
||||||
|
DelegationTokenIdentifier tokenId =
|
||||||
|
getTokenIdentifier(token.getIdentifier());
|
||||||
|
|
||||||
|
long t = fs.renewDelegationToken(token);
|
||||||
|
assertTrue(t + " should not be larger than " + tokenId.getMaxDate(),
|
||||||
|
t <= tokenId.getMaxDate());
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testCancelDelegationToken() throws Exception {
|
||||||
|
Token<?> token = fs.getDelegationToken("router");
|
||||||
|
fs.cancelDelegationToken(token);
|
||||||
|
LambdaTestUtils.intercept(InvalidToken.class,
|
||||||
|
"Renewal request for unknown token",
|
||||||
|
() -> fs.renewDelegationToken(token));
|
||||||
|
}
|
||||||
|
|
||||||
|
private DelegationTokenIdentifier getTokenIdentifier(byte[] id)
|
||||||
|
throws IOException {
|
||||||
|
DelegationTokenIdentifier identifier = new DelegationTokenIdentifier();
|
||||||
|
ByteArrayInputStream bais = new ByteArrayInputStream(id);
|
||||||
|
DataInputStream dais = new DataInputStream(bais);
|
||||||
|
identifier.readFields(dais);
|
||||||
|
return identifier;
|
||||||
|
}
|
||||||
|
}
|
|
@ -18,9 +18,15 @@
|
||||||
|
|
||||||
package org.apache.hadoop.hdfs.server.federation.security;
|
package org.apache.hadoop.hdfs.server.federation.security;
|
||||||
|
|
||||||
|
import org.apache.hadoop.conf.Configuration;
|
||||||
|
import org.apache.hadoop.fs.Path;
|
||||||
|
import org.apache.hadoop.fs.contract.router.RouterHDFSContract;
|
||||||
import org.apache.hadoop.hdfs.security.token.delegation.DelegationTokenIdentifier;
|
import org.apache.hadoop.hdfs.security.token.delegation.DelegationTokenIdentifier;
|
||||||
|
import org.apache.hadoop.hdfs.server.federation.RouterConfigBuilder;
|
||||||
import org.apache.hadoop.hdfs.server.federation.router.security.RouterSecurityManager;
|
import org.apache.hadoop.hdfs.server.federation.router.security.RouterSecurityManager;
|
||||||
|
import org.apache.hadoop.hdfs.server.federation.router.Router;
|
||||||
import org.apache.hadoop.io.Text;
|
import org.apache.hadoop.io.Text;
|
||||||
|
import org.apache.hadoop.security.Credentials;
|
||||||
import org.apache.hadoop.security.UserGroupInformation;
|
import org.apache.hadoop.security.UserGroupInformation;
|
||||||
import org.apache.hadoop.security.token.SecretManager;
|
import org.apache.hadoop.security.token.SecretManager;
|
||||||
import org.apache.hadoop.security.token.Token;
|
import org.apache.hadoop.security.token.Token;
|
||||||
|
@ -31,7 +37,10 @@ import org.junit.Rule;
|
||||||
import org.junit.Test;
|
import org.junit.Test;
|
||||||
import static org.junit.Assert.assertTrue;
|
import static org.junit.Assert.assertTrue;
|
||||||
import static org.junit.Assert.assertNotNull;
|
import static org.junit.Assert.assertNotNull;
|
||||||
|
import static org.junit.Assert.assertEquals;
|
||||||
|
import static org.apache.hadoop.fs.contract.router.SecurityConfUtil.initSecurity;
|
||||||
|
|
||||||
|
import org.hamcrest.core.StringContains;
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
|
|
||||||
import org.slf4j.Logger;
|
import org.slf4j.Logger;
|
||||||
|
@ -64,21 +73,19 @@ public class TestRouterSecurityManager {
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void testDelegationTokens() throws IOException {
|
public void testDelegationTokens() throws IOException {
|
||||||
String[] groupsForTesting = new String[1];
|
UserGroupInformation.reset();
|
||||||
groupsForTesting[0] = "router_group";
|
|
||||||
UserGroupInformation.setLoginUser(UserGroupInformation
|
UserGroupInformation.setLoginUser(UserGroupInformation
|
||||||
.createUserForTesting("router", groupsForTesting));
|
.createUserForTesting("router", getUserGroupForTesting()));
|
||||||
|
|
||||||
// Get a delegation token
|
// Get a delegation token
|
||||||
Token<DelegationTokenIdentifier> token =
|
Token<DelegationTokenIdentifier> token =
|
||||||
securityManager.getDelegationToken(new Text("some_renewer"));
|
securityManager.getDelegationToken(new Text("some_renewer"));
|
||||||
assertNotNull(token);
|
assertNotNull(token);
|
||||||
|
|
||||||
// Renew the delegation token
|
// Renew the delegation token
|
||||||
UserGroupInformation.setLoginUser(UserGroupInformation
|
UserGroupInformation.setLoginUser(UserGroupInformation
|
||||||
.createUserForTesting("some_renewer", groupsForTesting));
|
.createUserForTesting("some_renewer", getUserGroupForTesting()));
|
||||||
long updatedExpirationTime = securityManager.renewDelegationToken(token);
|
long updatedExpirationTime = securityManager.renewDelegationToken(token);
|
||||||
assertTrue(updatedExpirationTime >= token.decodeIdentifier().getMaxDate());
|
assertTrue(updatedExpirationTime <= token.decodeIdentifier().getMaxDate());
|
||||||
|
|
||||||
// Cancel the delegation token
|
// Cancel the delegation token
|
||||||
securityManager.cancelDelegationToken(token);
|
securityManager.cancelDelegationToken(token);
|
||||||
|
@ -90,4 +97,71 @@ public class TestRouterSecurityManager {
|
||||||
// This throws an exception as token has been cancelled.
|
// This throws an exception as token has been cancelled.
|
||||||
securityManager.renewDelegationToken(token);
|
securityManager.renewDelegationToken(token);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testVerifyToken() throws IOException {
|
||||||
|
UserGroupInformation.reset();
|
||||||
|
UserGroupInformation.setLoginUser(UserGroupInformation
|
||||||
|
.createUserForTesting("router", getUserGroupForTesting()));
|
||||||
|
|
||||||
|
// Get a delegation token
|
||||||
|
Token<DelegationTokenIdentifier> token =
|
||||||
|
securityManager.getDelegationToken(new Text("some_renewer"));
|
||||||
|
assertNotNull(token);
|
||||||
|
|
||||||
|
// Verify the password in delegation token
|
||||||
|
securityManager.verifyToken(token.decodeIdentifier(),
|
||||||
|
token.getPassword());
|
||||||
|
|
||||||
|
// Verify an invalid password
|
||||||
|
String exceptionCause = "password doesn't match";
|
||||||
|
exceptionRule.expect(SecretManager.InvalidToken.class);
|
||||||
|
exceptionRule.expectMessage(
|
||||||
|
StringContains.containsString(exceptionCause));
|
||||||
|
|
||||||
|
securityManager.verifyToken(token.decodeIdentifier(), new byte[10]);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testCreateCredentials() throws Exception {
|
||||||
|
Configuration conf = initSecurity();
|
||||||
|
|
||||||
|
// Start routers with only an RPC service
|
||||||
|
Configuration routerConf = new RouterConfigBuilder()
|
||||||
|
.metrics()
|
||||||
|
.rpc()
|
||||||
|
.build();
|
||||||
|
|
||||||
|
conf.addResource(routerConf);
|
||||||
|
Router router = new Router();
|
||||||
|
router.init(conf);
|
||||||
|
router.start();
|
||||||
|
|
||||||
|
UserGroupInformation ugi =
|
||||||
|
UserGroupInformation.createUserForTesting(
|
||||||
|
"router", getUserGroupForTesting());
|
||||||
|
Credentials creds = RouterSecurityManager.createCredentials(
|
||||||
|
router, ugi, "some_renewer");
|
||||||
|
for (Token token : creds.getAllTokens()) {
|
||||||
|
assertNotNull(token);
|
||||||
|
// Verify properties of the token
|
||||||
|
assertEquals("HDFS_DELEGATION_TOKEN", token.getKind().toString());
|
||||||
|
DelegationTokenIdentifier identifier = (DelegationTokenIdentifier)
|
||||||
|
token.decodeIdentifier();
|
||||||
|
assertNotNull(identifier);
|
||||||
|
String owner = identifier.getOwner().toString();
|
||||||
|
// Windows will not reverse name lookup "127.0.0.1" to "localhost".
|
||||||
|
String host = Path.WINDOWS ? "127.0.0.1" : "localhost";
|
||||||
|
String expectedOwner = "router/"+ host + "@EXAMPLE.COM";
|
||||||
|
assertEquals(expectedOwner, owner);
|
||||||
|
assertEquals("some_renewer", identifier.getRenewer().toString());
|
||||||
|
}
|
||||||
|
RouterHDFSContract.destroyCluster();
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
private static String[] getUserGroupForTesting() {
|
||||||
|
String[] groupsForTesting = {"router_group"};
|
||||||
|
return groupsForTesting;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue