HDFS-13358. RBF: Support for Delegation Token (RPC). Contributed by CR Hota.
This commit is contained in:
parent
bdacc8c831
commit
d8d6c9d324
|
@ -28,6 +28,8 @@ import org.apache.hadoop.hdfs.server.federation.resolver.MountTableResolver;
|
|||
import org.apache.hadoop.hdfs.server.federation.store.driver.StateStoreDriver;
|
||||
import org.apache.hadoop.hdfs.server.federation.store.driver.impl.StateStoreSerializerPBImpl;
|
||||
import org.apache.hadoop.hdfs.server.federation.store.driver.impl.StateStoreZooKeeperImpl;
|
||||
import org.apache.hadoop.security.token.delegation.AbstractDelegationTokenSecretManager;
|
||||
import org.apache.hadoop.hdfs.server.federation.router.security.token.ZKDelegationTokenSecretManagerImpl;
|
||||
|
||||
import java.util.concurrent.TimeUnit;
|
||||
|
||||
|
@ -294,4 +296,11 @@ public class RBFConfigKeys extends CommonConfigurationKeysPublic {
|
|||
|
||||
public static final String DFS_ROUTER_KERBEROS_INTERNAL_SPNEGO_PRINCIPAL_KEY =
|
||||
FEDERATION_ROUTER_PREFIX + "kerberos.internal.spnego.principal";
|
||||
|
||||
// HDFS Router secret manager for delegation token
|
||||
public static final String DFS_ROUTER_DELEGATION_TOKEN_DRIVER_CLASS =
|
||||
FEDERATION_ROUTER_PREFIX + "secret.manager.class";
|
||||
public static final Class<? extends AbstractDelegationTokenSecretManager>
|
||||
DFS_ROUTER_DELEGATION_TOKEN_DRIVER_CLASS_DEFAULT =
|
||||
ZKDelegationTokenSecretManagerImpl.class;
|
||||
}
|
||||
|
|
|
@ -77,6 +77,7 @@ import org.apache.hadoop.hdfs.server.federation.resolver.FederationNamespaceInfo
|
|||
import org.apache.hadoop.hdfs.server.federation.resolver.FileSubclusterResolver;
|
||||
import org.apache.hadoop.hdfs.server.federation.resolver.MountTableResolver;
|
||||
import org.apache.hadoop.hdfs.server.federation.resolver.RemoteLocation;
|
||||
import org.apache.hadoop.hdfs.server.federation.router.security.RouterSecurityManager;
|
||||
import org.apache.hadoop.hdfs.server.federation.store.records.MountTable;
|
||||
import org.apache.hadoop.hdfs.server.namenode.NameNode;
|
||||
import org.apache.hadoop.hdfs.server.protocol.DatanodeStorageReport;
|
||||
|
@ -124,6 +125,8 @@ public class RouterClientProtocol implements ClientProtocol {
|
|||
private final ErasureCoding erasureCoding;
|
||||
/** StoragePolicy calls. **/
|
||||
private final RouterStoragePolicy storagePolicy;
|
||||
/** Router security manager to handle token operations. */
|
||||
private RouterSecurityManager securityManager = null;
|
||||
|
||||
RouterClientProtocol(Configuration conf, RouterRpcServer rpcServer) {
|
||||
this.rpcServer = rpcServer;
|
||||
|
@ -148,13 +151,14 @@ public class RouterClientProtocol implements ClientProtocol {
|
|||
DFSConfigKeys.DFS_PERMISSIONS_SUPERUSERGROUP_DEFAULT);
|
||||
this.erasureCoding = new ErasureCoding(rpcServer);
|
||||
this.storagePolicy = new RouterStoragePolicy(rpcServer);
|
||||
this.securityManager = rpcServer.getRouterSecurityManager();
|
||||
}
|
||||
|
||||
@Override
|
||||
public Token<DelegationTokenIdentifier> getDelegationToken(Text renewer)
|
||||
throws IOException {
|
||||
rpcServer.checkOperation(NameNode.OperationCategory.WRITE, false);
|
||||
return null;
|
||||
rpcServer.checkOperation(NameNode.OperationCategory.WRITE, true);
|
||||
return this.securityManager.getDelegationToken(renewer);
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -173,14 +177,16 @@ public class RouterClientProtocol implements ClientProtocol {
|
|||
@Override
|
||||
public long renewDelegationToken(Token<DelegationTokenIdentifier> token)
|
||||
throws IOException {
|
||||
rpcServer.checkOperation(NameNode.OperationCategory.WRITE, false);
|
||||
return 0;
|
||||
rpcServer.checkOperation(NameNode.OperationCategory.WRITE, true);
|
||||
return this.securityManager.renewDelegationToken(token);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void cancelDelegationToken(Token<DelegationTokenIdentifier> token)
|
||||
throws IOException {
|
||||
rpcServer.checkOperation(NameNode.OperationCategory.WRITE, false);
|
||||
rpcServer.checkOperation(NameNode.OperationCategory.WRITE, true);
|
||||
this.securityManager.cancelDelegationToken(token);
|
||||
return;
|
||||
}
|
||||
|
||||
@Override
|
||||
|
|
|
@ -114,6 +114,7 @@ import org.apache.hadoop.hdfs.server.federation.resolver.MountTableResolver;
|
|||
import org.apache.hadoop.hdfs.server.federation.resolver.PathLocation;
|
||||
import org.apache.hadoop.hdfs.server.federation.resolver.RemoteLocation;
|
||||
import org.apache.hadoop.hdfs.server.federation.store.records.MountTable;
|
||||
import org.apache.hadoop.hdfs.server.federation.router.security.RouterSecurityManager;
|
||||
import org.apache.hadoop.hdfs.server.namenode.CheckpointSignature;
|
||||
import org.apache.hadoop.hdfs.server.namenode.LeaseExpiredException;
|
||||
import org.apache.hadoop.hdfs.server.namenode.NameNode.OperationCategory;
|
||||
|
@ -197,6 +198,8 @@ public class RouterRpcServer extends AbstractService
|
|||
private final RouterNamenodeProtocol nnProto;
|
||||
/** ClientProtocol calls. */
|
||||
private final RouterClientProtocol clientProto;
|
||||
/** Router security manager to handle token operations. */
|
||||
private RouterSecurityManager securityManager = null;
|
||||
|
||||
/**
|
||||
* Construct a router RPC server.
|
||||
|
@ -256,6 +259,9 @@ public class RouterRpcServer extends AbstractService
|
|||
LOG.info("RPC server binding to {} with {} handlers for Router {}",
|
||||
confRpcAddress, handlerCount, this.router.getRouterId());
|
||||
|
||||
// Create security manager
|
||||
this.securityManager = new RouterSecurityManager(this.conf);
|
||||
|
||||
this.rpcServer = new RPC.Builder(this.conf)
|
||||
.setProtocol(ClientNamenodeProtocolPB.class)
|
||||
.setInstance(clientNNPbService)
|
||||
|
@ -265,6 +271,7 @@ public class RouterRpcServer extends AbstractService
|
|||
.setnumReaders(readerCount)
|
||||
.setQueueSizePerHandler(handlerQueueSize)
|
||||
.setVerbose(false)
|
||||
.setSecretManager(this.securityManager.getSecretManager())
|
||||
.build();
|
||||
|
||||
// Add all the RPC protocols that the Router implements
|
||||
|
@ -344,9 +351,21 @@ public class RouterRpcServer extends AbstractService
|
|||
if (rpcMonitor != null) {
|
||||
this.rpcMonitor.close();
|
||||
}
|
||||
if (securityManager != null) {
|
||||
this.securityManager.stop();
|
||||
}
|
||||
super.serviceStop();
|
||||
}
|
||||
|
||||
/**
|
||||
* Get the RPC security manager.
|
||||
*
|
||||
* @return RPC security manager.
|
||||
*/
|
||||
public RouterSecurityManager getRouterSecurityManager() {
|
||||
return this.securityManager;
|
||||
}
|
||||
|
||||
/**
|
||||
* Get the RPC client to the Namenode.
|
||||
*
|
||||
|
@ -1457,7 +1476,7 @@ public class RouterRpcServer extends AbstractService
|
|||
* @return Remote user group information.
|
||||
* @throws IOException If we cannot get the user information.
|
||||
*/
|
||||
static UserGroupInformation getRemoteUser() throws IOException {
|
||||
public static UserGroupInformation getRemoteUser() throws IOException {
|
||||
UserGroupInformation ugi = Server.getRemoteUser();
|
||||
return (ugi != null) ? ugi : UserGroupInformation.getCurrentUser();
|
||||
}
|
||||
|
|
|
@ -0,0 +1,239 @@
|
|||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.hadoop.hdfs.server.federation.router.security;
|
||||
|
||||
import com.google.common.annotations.VisibleForTesting;
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.hdfs.DFSUtil;
|
||||
import org.apache.hadoop.hdfs.security.token.delegation.DelegationTokenIdentifier;
|
||||
import org.apache.hadoop.hdfs.server.federation.router.RBFConfigKeys;
|
||||
import org.apache.hadoop.hdfs.server.federation.router.RouterRpcServer;
|
||||
import org.apache.hadoop.io.Text;
|
||||
import org.apache.hadoop.security.AccessControlException;
|
||||
import org.apache.hadoop.security.UserGroupInformation;
|
||||
import org.apache.hadoop.security.UserGroupInformation.AuthenticationMethod;
|
||||
import org.apache.hadoop.security.token.SecretManager;
|
||||
import org.apache.hadoop.security.token.Token;
|
||||
import org.apache.hadoop.security.token.delegation.AbstractDelegationTokenSecretManager;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.lang.reflect.Constructor;
|
||||
|
||||
/**
|
||||
* Manager to hold underlying delegation token secret manager implementations.
|
||||
*/
|
||||
public class RouterSecurityManager {
|
||||
|
||||
private static final Logger LOG =
|
||||
LoggerFactory.getLogger(RouterSecurityManager.class);
|
||||
|
||||
private AbstractDelegationTokenSecretManager<DelegationTokenIdentifier>
|
||||
dtSecretManager = null;
|
||||
|
||||
public RouterSecurityManager(Configuration conf) {
|
||||
this.dtSecretManager = newSecretManager(conf);
|
||||
}
|
||||
|
||||
@VisibleForTesting
|
||||
public RouterSecurityManager(AbstractDelegationTokenSecretManager
|
||||
<DelegationTokenIdentifier> dtSecretManager) {
|
||||
this.dtSecretManager = dtSecretManager;
|
||||
}
|
||||
|
||||
/**
|
||||
* Creates an instance of a SecretManager from the configuration.
|
||||
*
|
||||
* @param conf Configuration that defines the secret manager class.
|
||||
* @return New secret manager.
|
||||
*/
|
||||
public static AbstractDelegationTokenSecretManager<DelegationTokenIdentifier>
|
||||
newSecretManager(Configuration conf) {
|
||||
Class<? extends AbstractDelegationTokenSecretManager> clazz =
|
||||
conf.getClass(
|
||||
RBFConfigKeys.DFS_ROUTER_DELEGATION_TOKEN_DRIVER_CLASS,
|
||||
RBFConfigKeys.DFS_ROUTER_DELEGATION_TOKEN_DRIVER_CLASS_DEFAULT,
|
||||
AbstractDelegationTokenSecretManager.class);
|
||||
AbstractDelegationTokenSecretManager secretManager;
|
||||
try {
|
||||
Constructor constructor = clazz.getConstructor(Configuration.class);
|
||||
secretManager = (AbstractDelegationTokenSecretManager)
|
||||
constructor.newInstance(conf);
|
||||
LOG.info("Delegation token secret manager object instantiated");
|
||||
} catch (ReflectiveOperationException e) {
|
||||
LOG.error("Could not instantiate: {}", clazz.getSimpleName(), e);
|
||||
return null;
|
||||
} catch (RuntimeException e) {
|
||||
LOG.error("RuntimeException to instantiate: {}",
|
||||
clazz.getSimpleName(), e);
|
||||
return null;
|
||||
}
|
||||
return secretManager;
|
||||
}
|
||||
|
||||
public AbstractDelegationTokenSecretManager<DelegationTokenIdentifier>
|
||||
getSecretManager() {
|
||||
return this.dtSecretManager;
|
||||
}
|
||||
|
||||
public void stop() {
|
||||
LOG.info("Stopping security manager");
|
||||
if(this.dtSecretManager != null) {
|
||||
this.dtSecretManager.stopThreads();
|
||||
}
|
||||
}
|
||||
|
||||
private static UserGroupInformation getRemoteUser() throws IOException {
|
||||
return RouterRpcServer.getRemoteUser();
|
||||
}
|
||||
/**
|
||||
* Returns authentication method used to establish the connection.
|
||||
* @return AuthenticationMethod used to establish connection.
|
||||
* @throws IOException
|
||||
*/
|
||||
private UserGroupInformation.AuthenticationMethod
|
||||
getConnectionAuthenticationMethod() throws IOException {
|
||||
UserGroupInformation ugi = getRemoteUser();
|
||||
UserGroupInformation.AuthenticationMethod authMethod
|
||||
= ugi.getAuthenticationMethod();
|
||||
if (authMethod == UserGroupInformation.AuthenticationMethod.PROXY) {
|
||||
authMethod = ugi.getRealUser().getAuthenticationMethod();
|
||||
}
|
||||
return authMethod;
|
||||
}
|
||||
|
||||
/**
|
||||
*
|
||||
* @return true if delegation token operation is allowed
|
||||
*/
|
||||
private boolean isAllowedDelegationTokenOp() throws IOException {
|
||||
AuthenticationMethod authMethod = getConnectionAuthenticationMethod();
|
||||
if (UserGroupInformation.isSecurityEnabled()
|
||||
&& (authMethod != AuthenticationMethod.KERBEROS)
|
||||
&& (authMethod != AuthenticationMethod.KERBEROS_SSL)
|
||||
&& (authMethod != AuthenticationMethod.CERTIFICATE)) {
|
||||
return false;
|
||||
}
|
||||
return true;
|
||||
}
|
||||
|
||||
/**
|
||||
* @param renewer Renewer information
|
||||
* @return delegation token
|
||||
* @throws IOException on error
|
||||
*/
|
||||
public Token<DelegationTokenIdentifier> getDelegationToken(Text renewer)
|
||||
throws IOException {
|
||||
LOG.debug("Generate delegation token with renewer " + renewer);
|
||||
final String operationName = "getDelegationToken";
|
||||
boolean success = false;
|
||||
String tokenId = "";
|
||||
Token<DelegationTokenIdentifier> token;
|
||||
try {
|
||||
if (!isAllowedDelegationTokenOp()) {
|
||||
throw new IOException(
|
||||
"Delegation Token can be issued only " +
|
||||
"with kerberos or web authentication");
|
||||
}
|
||||
if (dtSecretManager == null || !dtSecretManager.isRunning()) {
|
||||
LOG.warn("trying to get DT with no secret manager running");
|
||||
return null;
|
||||
}
|
||||
UserGroupInformation ugi = getRemoteUser();
|
||||
String user = ugi.getUserName();
|
||||
Text owner = new Text(user);
|
||||
Text realUser = null;
|
||||
if (ugi.getRealUser() != null) {
|
||||
realUser = new Text(ugi.getRealUser().getUserName());
|
||||
}
|
||||
DelegationTokenIdentifier dtId = new DelegationTokenIdentifier(owner,
|
||||
renewer, realUser);
|
||||
token = new Token<DelegationTokenIdentifier>(
|
||||
dtId, dtSecretManager);
|
||||
tokenId = dtId.toStringStable();
|
||||
success = true;
|
||||
} finally {
|
||||
logAuditEvent(success, operationName, tokenId);
|
||||
}
|
||||
return token;
|
||||
}
|
||||
|
||||
public long renewDelegationToken(Token<DelegationTokenIdentifier> token)
|
||||
throws SecretManager.InvalidToken, IOException {
|
||||
LOG.debug("Renew delegation token");
|
||||
final String operationName = "renewDelegationToken";
|
||||
boolean success = false;
|
||||
String tokenId = "";
|
||||
long expiryTime;
|
||||
try {
|
||||
if (!isAllowedDelegationTokenOp()) {
|
||||
throw new IOException(
|
||||
"Delegation Token can be renewed only " +
|
||||
"with kerberos or web authentication");
|
||||
}
|
||||
String renewer = getRemoteUser().getShortUserName();
|
||||
expiryTime = dtSecretManager.renewToken(token, renewer);
|
||||
final DelegationTokenIdentifier id = DFSUtil.decodeDelegationToken(token);
|
||||
tokenId = id.toStringStable();
|
||||
success = true;
|
||||
} catch (AccessControlException ace) {
|
||||
final DelegationTokenIdentifier id = DFSUtil.decodeDelegationToken(token);
|
||||
tokenId = id.toStringStable();
|
||||
throw ace;
|
||||
} finally {
|
||||
logAuditEvent(success, operationName, tokenId);
|
||||
}
|
||||
return expiryTime;
|
||||
}
|
||||
|
||||
public void cancelDelegationToken(Token<DelegationTokenIdentifier> token)
|
||||
throws IOException {
|
||||
LOG.debug("Cancel delegation token");
|
||||
final String operationName = "cancelDelegationToken";
|
||||
boolean success = false;
|
||||
String tokenId = "";
|
||||
try {
|
||||
String canceller = getRemoteUser().getUserName();
|
||||
LOG.info("Cancel request by " + canceller);
|
||||
DelegationTokenIdentifier id =
|
||||
dtSecretManager.cancelToken(token, canceller);
|
||||
tokenId = id.toStringStable();
|
||||
success = true;
|
||||
} catch (AccessControlException ace) {
|
||||
final DelegationTokenIdentifier id = DFSUtil.decodeDelegationToken(token);
|
||||
tokenId = id.toStringStable();
|
||||
throw ace;
|
||||
} finally {
|
||||
logAuditEvent(success, operationName, tokenId);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Log status of delegation token related operation.
|
||||
* Extend in future to use audit logger instead of local logging.
|
||||
*/
|
||||
void logAuditEvent(boolean succeeded, String cmd, String tokenId)
|
||||
throws IOException {
|
||||
LOG.debug(
|
||||
"Operation:" + cmd +
|
||||
" Status:" + succeeded +
|
||||
" TokenId:" + tokenId);
|
||||
}
|
||||
}
|
|
@ -0,0 +1,28 @@
|
|||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
/**
|
||||
* Includes router security manager and token store implementations.
|
||||
*/
|
||||
@InterfaceAudience.Private
|
||||
@InterfaceStability.Evolving
|
||||
|
||||
package org.apache.hadoop.hdfs.server.federation.router.security;
|
||||
|
||||
import org.apache.hadoop.classification.InterfaceAudience;
|
||||
import org.apache.hadoop.classification.InterfaceStability;
|
|
@ -0,0 +1,56 @@
|
|||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.hadoop.hdfs.server.federation.router.security.token;
|
||||
|
||||
import org.apache.hadoop.hdfs.security.token.delegation.DelegationTokenIdentifier;
|
||||
import org.apache.hadoop.security.token.delegation.AbstractDelegationTokenIdentifier;
|
||||
import org.apache.hadoop.security.token.delegation.ZKDelegationTokenSecretManager;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
|
||||
import java.io.IOException;
|
||||
|
||||
/**
|
||||
* Zookeeper based router delegation token store implementation.
|
||||
*/
|
||||
public class ZKDelegationTokenSecretManagerImpl extends
|
||||
ZKDelegationTokenSecretManager<AbstractDelegationTokenIdentifier> {
|
||||
|
||||
private static final Logger LOG =
|
||||
LoggerFactory.getLogger(ZKDelegationTokenSecretManagerImpl.class);
|
||||
|
||||
private Configuration conf = null;
|
||||
|
||||
public ZKDelegationTokenSecretManagerImpl(Configuration conf) {
|
||||
super(conf);
|
||||
this.conf = conf;
|
||||
try {
|
||||
super.startThreads();
|
||||
} catch (IOException e) {
|
||||
LOG.error("Error starting threads for zkDelegationTokens ");
|
||||
}
|
||||
LOG.info("Zookeeper delegation token secret manager instantiated");
|
||||
}
|
||||
|
||||
@Override
|
||||
public DelegationTokenIdentifier createIdentifier() {
|
||||
return new DelegationTokenIdentifier();
|
||||
}
|
||||
}
|
|
@ -0,0 +1,29 @@
|
|||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
/**
|
||||
* Includes implementations of token secret managers.
|
||||
* Implementations should extend {@link AbstractDelegationTokenSecretManager}.
|
||||
*/
|
||||
@InterfaceAudience.Private
|
||||
@InterfaceStability.Evolving
|
||||
|
||||
package org.apache.hadoop.hdfs.server.federation.router.security.token;
|
||||
|
||||
import org.apache.hadoop.classification.InterfaceAudience;
|
||||
import org.apache.hadoop.classification.InterfaceStability;
|
|
@ -584,4 +584,13 @@
|
|||
</description>
|
||||
</property>
|
||||
|
||||
</configuration>
|
||||
<property>
|
||||
<name>dfs.federation.router.secret.manager.class</name>
|
||||
<value>org.apache.hadoop.hdfs.server.federation.router.security.token.ZKDelegationTokenSecretManagerImpl</value>
|
||||
<description>
|
||||
Class to implement state store to delegation tokens.
|
||||
Default implementation uses zookeeper as the backend to store delegation tokens.
|
||||
</description>
|
||||
</property>
|
||||
|
||||
</configuration>
|
||||
|
|
|
@ -31,6 +31,7 @@ import static org.apache.hadoop.hdfs.server.federation.router.RBFConfigKeys.DFS_
|
|||
import static org.apache.hadoop.hdfs.server.federation.router.RBFConfigKeys.DFS_ROUTER_KERBEROS_PRINCIPAL_KEY;
|
||||
import static org.apache.hadoop.hdfs.server.federation.router.RBFConfigKeys.DFS_ROUTER_KEYTAB_FILE_KEY;
|
||||
import static org.apache.hadoop.hdfs.server.federation.router.RBFConfigKeys.DFS_ROUTER_RPC_BIND_HOST_KEY;
|
||||
import static org.apache.hadoop.hdfs.server.federation.router.RBFConfigKeys.DFS_ROUTER_DELEGATION_TOKEN_DRIVER_CLASS;
|
||||
import static org.junit.Assert.assertTrue;
|
||||
|
||||
import java.io.File;
|
||||
|
@ -43,6 +44,7 @@ import org.apache.hadoop.hdfs.HdfsConfiguration;
|
|||
import org.apache.hadoop.hdfs.server.federation.router.RBFConfigKeys;
|
||||
import org.apache.hadoop.hdfs.server.federation.store.driver.StateStoreDriver;
|
||||
import org.apache.hadoop.hdfs.server.federation.store.driver.impl.StateStoreFileImpl;
|
||||
import org.apache.hadoop.hdfs.server.federation.security.MockDelegationTokenSecretManager;
|
||||
import org.apache.hadoop.http.HttpConfig;
|
||||
import org.apache.hadoop.minikdc.MiniKdc;
|
||||
import org.apache.hadoop.security.SecurityUtil;
|
||||
|
@ -144,6 +146,8 @@ public final class SecurityConfUtil {
|
|||
|
||||
// We need to specify the host to prevent 0.0.0.0 as the host address
|
||||
conf.set(DFS_ROUTER_RPC_BIND_HOST_KEY, "localhost");
|
||||
conf.set(DFS_ROUTER_DELEGATION_TOKEN_DRIVER_CLASS,
|
||||
MockDelegationTokenSecretManager.class.getName());
|
||||
|
||||
return conf;
|
||||
}
|
||||
|
|
|
@ -0,0 +1,101 @@
|
|||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.hadoop.fs.contract.router;
|
||||
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.fs.Path;
|
||||
import org.apache.hadoop.fs.contract.AbstractFSContract;
|
||||
import org.apache.hadoop.fs.contract.AbstractFSContractTestBase;
|
||||
import org.apache.hadoop.hdfs.security.token.delegation.DelegationTokenIdentifier;
|
||||
import org.apache.hadoop.security.token.SecretManager;
|
||||
import org.apache.hadoop.security.token.Token;
|
||||
import org.junit.AfterClass;
|
||||
import org.junit.BeforeClass;
|
||||
import org.junit.Rule;
|
||||
import org.junit.Test;
|
||||
import org.junit.rules.ExpectedException;
|
||||
|
||||
import java.io.IOException;
|
||||
import static org.apache.hadoop.fs.contract.router.SecurityConfUtil.initSecurity;
|
||||
|
||||
/**
|
||||
* Test to verify router contracts for delegation token operations.
|
||||
*/
|
||||
public class TestRouterHDFSContractDelegationToken
|
||||
extends AbstractFSContractTestBase {
|
||||
|
||||
@BeforeClass
|
||||
public static void createCluster() throws Exception {
|
||||
RouterHDFSContract.createCluster(initSecurity());
|
||||
}
|
||||
|
||||
@AfterClass
|
||||
public static void teardownCluster() throws IOException {
|
||||
RouterHDFSContract.destroyCluster();
|
||||
}
|
||||
|
||||
@Override
|
||||
protected AbstractFSContract createContract(Configuration conf) {
|
||||
return new RouterHDFSContract(conf);
|
||||
}
|
||||
|
||||
@Rule
|
||||
public ExpectedException exceptionRule = ExpectedException.none();
|
||||
|
||||
@Test
|
||||
public void testRouterDelegationToken() throws Exception {
|
||||
// Generate delegation token
|
||||
Token<DelegationTokenIdentifier> token =
|
||||
(Token<DelegationTokenIdentifier>) getFileSystem()
|
||||
.getDelegationToken("router");
|
||||
assertNotNull(token);
|
||||
// Verify properties of the token
|
||||
assertEquals("HDFS_DELEGATION_TOKEN", token.getKind().toString());
|
||||
DelegationTokenIdentifier identifier = token.decodeIdentifier();
|
||||
assertNotNull(identifier);
|
||||
String owner = identifier.getOwner().toString();
|
||||
// Windows will not reverse name lookup "127.0.0.1" to "localhost".
|
||||
String host = Path.WINDOWS ? "127.0.0.1" : "localhost";
|
||||
String expectedOwner = "router/"+ host + "@EXAMPLE.COM";
|
||||
assertEquals(expectedOwner, owner);
|
||||
assertEquals("router", identifier.getRenewer().toString());
|
||||
int masterKeyId = identifier.getMasterKeyId();
|
||||
assertTrue(masterKeyId > 0);
|
||||
int sequenceNumber = identifier.getSequenceNumber();
|
||||
assertTrue(sequenceNumber > 0);
|
||||
long existingMaxTime = token.decodeIdentifier().getMaxDate();
|
||||
assertTrue(identifier.getMaxDate() >= identifier.getIssueDate());
|
||||
|
||||
// Renew delegation token
|
||||
token.renew(initSecurity());
|
||||
assertNotNull(token);
|
||||
assertTrue(token.decodeIdentifier().getMaxDate() >= existingMaxTime);
|
||||
// Renewal should retain old master key id and sequence number
|
||||
identifier = token.decodeIdentifier();
|
||||
assertEquals(identifier.getMasterKeyId(), masterKeyId);
|
||||
assertEquals(identifier.getSequenceNumber(), sequenceNumber);
|
||||
|
||||
// Cancel delegation token
|
||||
token.cancel(initSecurity());
|
||||
|
||||
// Renew a cancelled token
|
||||
exceptionRule.expect(SecretManager.InvalidToken.class);
|
||||
token.renew(initSecurity());
|
||||
}
|
||||
}
|
|
@ -0,0 +1,52 @@
|
|||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.hadoop.hdfs.server.federation.security;
|
||||
|
||||
import org.apache.hadoop.hdfs.security.token.delegation.DelegationTokenIdentifier;
|
||||
import org.apache.hadoop.security.token.delegation.AbstractDelegationTokenSecretManager;
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import java.io.IOException;
|
||||
|
||||
/**
|
||||
* Mock functionality of AbstractDelegationTokenSecretManager.
|
||||
* for testing
|
||||
*/
|
||||
public class MockDelegationTokenSecretManager
|
||||
extends AbstractDelegationTokenSecretManager<DelegationTokenIdentifier> {
|
||||
|
||||
public MockDelegationTokenSecretManager(
|
||||
long delegationKeyUpdateInterval,
|
||||
long delegationTokenMaxLifetime,
|
||||
long delegationTokenRenewInterval,
|
||||
long delegationTokenRemoverScanInterval) {
|
||||
super(delegationKeyUpdateInterval, delegationTokenMaxLifetime,
|
||||
delegationTokenRenewInterval, delegationTokenRemoverScanInterval);
|
||||
}
|
||||
|
||||
public MockDelegationTokenSecretManager(Configuration conf)
|
||||
throws IOException {
|
||||
super(100000, 100000, 100000, 100000);
|
||||
this.startThreads();
|
||||
}
|
||||
|
||||
@Override
|
||||
public DelegationTokenIdentifier createIdentifier() {
|
||||
return new DelegationTokenIdentifier();
|
||||
}
|
||||
}
|
|
@ -0,0 +1,93 @@
|
|||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.hadoop.hdfs.server.federation.security;
|
||||
|
||||
import org.apache.hadoop.hdfs.security.token.delegation.DelegationTokenIdentifier;
|
||||
import org.apache.hadoop.hdfs.server.federation.router.security.RouterSecurityManager;
|
||||
import org.apache.hadoop.io.Text;
|
||||
import org.apache.hadoop.security.UserGroupInformation;
|
||||
import org.apache.hadoop.security.token.SecretManager;
|
||||
import org.apache.hadoop.security.token.Token;
|
||||
import org.apache.hadoop.security.token.delegation.AbstractDelegationTokenSecretManager;
|
||||
import org.junit.rules.ExpectedException;
|
||||
import org.junit.BeforeClass;
|
||||
import org.junit.Rule;
|
||||
import org.junit.Test;
|
||||
import static org.junit.Assert.assertTrue;
|
||||
import static org.junit.Assert.assertNotNull;
|
||||
|
||||
import java.io.IOException;
|
||||
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
|
||||
/**
|
||||
* Test functionality of {@link RouterSecurityManager}, which manages
|
||||
* delegation tokens for router.
|
||||
*/
|
||||
public class TestRouterSecurityManager {
|
||||
|
||||
private static final Logger LOG =
|
||||
LoggerFactory.getLogger(TestRouterSecurityManager.class);
|
||||
|
||||
private static RouterSecurityManager securityManager = null;
|
||||
|
||||
@BeforeClass
|
||||
public static void createMockSecretManager() throws IOException {
|
||||
AbstractDelegationTokenSecretManager<DelegationTokenIdentifier>
|
||||
mockDelegationTokenSecretManager =
|
||||
new MockDelegationTokenSecretManager(100, 100, 100, 100);
|
||||
mockDelegationTokenSecretManager.startThreads();
|
||||
securityManager =
|
||||
new RouterSecurityManager(mockDelegationTokenSecretManager);
|
||||
}
|
||||
|
||||
@Rule
|
||||
public ExpectedException exceptionRule = ExpectedException.none();
|
||||
|
||||
@Test
|
||||
public void testDelegationTokens() throws IOException {
|
||||
String[] groupsForTesting = new String[1];
|
||||
groupsForTesting[0] = "router_group";
|
||||
UserGroupInformation.setLoginUser(UserGroupInformation
|
||||
.createUserForTesting("router", groupsForTesting));
|
||||
|
||||
// Get a delegation token
|
||||
Token<DelegationTokenIdentifier> token =
|
||||
securityManager.getDelegationToken(new Text("some_renewer"));
|
||||
assertNotNull(token);
|
||||
|
||||
// Renew the delegation token
|
||||
UserGroupInformation.setLoginUser(UserGroupInformation
|
||||
.createUserForTesting("some_renewer", groupsForTesting));
|
||||
long updatedExpirationTime = securityManager.renewDelegationToken(token);
|
||||
assertTrue(updatedExpirationTime >= token.decodeIdentifier().getMaxDate());
|
||||
|
||||
// Cancel the delegation token
|
||||
securityManager.cancelDelegationToken(token);
|
||||
|
||||
String exceptionCause = "Renewal request for unknown token";
|
||||
exceptionRule.expect(SecretManager.InvalidToken.class);
|
||||
exceptionRule.expectMessage(exceptionCause);
|
||||
|
||||
// This throws an exception as token has been cancelled.
|
||||
securityManager.renewDelegationToken(token);
|
||||
}
|
||||
}
|
Loading…
Reference in New Issue