YARN-11350. [Federation] Router Support DelegationToken With ZK. (#5131)

This commit is contained in:
slfan1989 2022-12-15 01:09:38 +08:00 committed by GitHub
parent 4de8791deb
commit 63b9a6a2b6
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
15 changed files with 1223 additions and 40 deletions

View File

@ -112,4 +112,39 @@ RouterRMTokenResponse removeStoredToken(RouterRMTokenRequest request)
*/
RouterRMTokenResponse getTokenByRouterStoreToken(RouterRMTokenRequest request)
throws YarnException, IOException;
/**
* The Router Supports incrementDelegationTokenSeqNum.
*
* @return DelegationTokenSeqNum.
*/
int incrementDelegationTokenSeqNum();
/**
* The Router Supports getDelegationTokenSeqNum.
*
* @return DelegationTokenSeqNum.
*/
int getDelegationTokenSeqNum();
/**
* The Router Supports setDelegationTokenSeqNum.
*
* @param seqNum DelegationTokenSeqNum.
*/
void setDelegationTokenSeqNum(int seqNum);
/**
* The Router Supports getCurrentKeyId.
*
* @return CurrentKeyId.
*/
int getCurrentKeyId();
/**
* The Router Supports incrementCurrentKeyId.
*
* @return CurrentKeyId.
*/
int incrementCurrentKeyId();
}

View File

@ -27,6 +27,7 @@
import java.util.Set;
import java.util.TimeZone;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Collectors;
import java.util.Comparator;
@ -110,6 +111,8 @@ public class MemoryFederationStateStore implements FederationStateStore {
private Map<String, SubClusterPolicyConfiguration> policies;
private RouterRMDTSecretManagerState routerRMSecretManagerState;
private int maxAppsInStateStore;
private AtomicInteger sequenceNum;
private AtomicInteger masterKeyId;
private final MonotonicClock clock = new MonotonicClock();
@ -126,6 +129,8 @@ public void init(Configuration conf) {
maxAppsInStateStore = conf.getInt(
YarnConfiguration.FEDERATION_STATESTORE_MAX_APPLICATIONS,
YarnConfiguration.DEFAULT_FEDERATION_STATESTORE_MAX_APPLICATIONS);
sequenceNum = new AtomicInteger();
masterKeyId = new AtomicInteger();
}
@Override
@ -534,6 +539,31 @@ public RouterRMTokenResponse getTokenByRouterStoreToken(RouterRMTokenRequest req
return RouterRMTokenResponse.newInstance(resultToken);
}
@Override
public int incrementDelegationTokenSeqNum() {
return sequenceNum.incrementAndGet();
}
@Override
public int getDelegationTokenSeqNum() {
return sequenceNum.get();
}
@Override
public void setDelegationTokenSeqNum(int seqNum) {
sequenceNum.set(seqNum);
}
@Override
public int getCurrentKeyId() {
return masterKeyId.get();
}
@Override
public int incrementCurrentKeyId() {
return masterKeyId.incrementAndGet();
}
private void storeOrUpdateRouterRMDT(RMDelegationTokenIdentifier rmDTIdentifier,
Long renewDate, boolean isUpdate) throws IOException {
Map<RMDelegationTokenIdentifier, Long> rmDTState = routerRMSecretManagerState.getTokenState();

View File

@ -1394,4 +1394,29 @@ public RouterRMTokenResponse getTokenByRouterStoreToken(RouterRMTokenRequest req
throws YarnException, IOException {
throw new NotImplementedException("Code is not implemented");
}
@Override
public int incrementDelegationTokenSeqNum() {
return 0;
}
@Override
public int getDelegationTokenSeqNum() {
return 0;
}
@Override
public void setDelegationTokenSeqNum(int seqNum) {
return;
}
@Override
public int getCurrentKeyId() {
return 0;
}
@Override
public int incrementCurrentKeyId() {
return 0;
}
}

View File

@ -89,6 +89,27 @@ public final class ZKFederationStateStoreOpDurations implements MetricsSource {
@Metric("Duration for a update reservation homeSubCluster call")
private MutableRate updateReservationHomeSubCluster;
@Metric("Duration for a store new master key call")
private MutableRate storeNewMasterKey;
@Metric("Duration for a remove new master key call")
private MutableRate removeStoredMasterKey;
@Metric("Duration for a get master key by delegation key call")
private MutableRate getMasterKeyByDelegationKey;
@Metric("Duration for a store new token call")
private MutableRate storeNewToken;
@Metric("Duration for a update stored token call")
private MutableRate updateStoredToken;
@Metric("Duration for a remove stored token call")
private MutableRate removeStoredToken;
@Metric("Duration for a get token by router store token call")
private MutableRate getTokenByRouterStoreToken;
protected static final MetricsInfo RECORD_INFO =
info("ZKFederationStateStoreOpDurations", "Durations of ZKFederationStateStore calls");
@ -187,4 +208,32 @@ public void addDeleteReservationHomeSubClusterDuration(long startTime, long endT
public void addUpdateReservationHomeSubClusterDuration(long startTime, long endTime) {
updateReservationHomeSubCluster.add(endTime - startTime);
}
public void addStoreNewMasterKeyDuration(long startTime, long endTime) {
storeNewMasterKey.add(endTime - startTime);
}
public void removeStoredMasterKeyDuration(long startTime, long endTime) {
removeStoredMasterKey.add(endTime - startTime);
}
public void getMasterKeyByDelegationKeyDuration(long startTime, long endTime) {
getMasterKeyByDelegationKey.add(endTime - startTime);
}
public void getStoreNewTokenDuration(long startTime, long endTime) {
storeNewToken.add(endTime - startTime);
}
public void updateStoredTokenDuration(long startTime, long endTime) {
updateStoredToken.add(endTime - startTime);
}
public void removeStoredTokenDuration(long startTime, long endTime) {
removeStoredToken.add(endTime - startTime);
}
public void getTokenByRouterStoreTokenDuration(long startTime, long endTime) {
getTokenByRouterStoreToken.add(endTime - startTime);
}
}

View File

@ -17,9 +17,12 @@
package org.apache.hadoop.yarn.server.federation.store.impl;
import static org.apache.hadoop.util.curator.ZKCuratorManager.getNodePath;
import java.io.IOException;
import java.io.ByteArrayOutputStream;
import java.io.DataOutputStream;
import java.io.ByteArrayInputStream;
import java.io.DataInputStream;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Calendar;
import java.util.List;
@ -27,9 +30,11 @@
import java.util.Comparator;
import java.util.stream.Collectors;
import org.apache.commons.lang3.NotImplementedException;
import org.apache.curator.framework.recipes.shared.SharedCount;
import org.apache.curator.framework.recipes.shared.VersionedValue;
import org.apache.hadoop.classification.VisibleForTesting;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.security.token.delegation.DelegationKey;
import org.apache.hadoop.util.curator.ZKCuratorManager;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
@ -37,6 +42,7 @@
import org.apache.hadoop.yarn.federation.proto.YarnServerFederationProtos.SubClusterIdProto;
import org.apache.hadoop.yarn.federation.proto.YarnServerFederationProtos.SubClusterInfoProto;
import org.apache.hadoop.yarn.federation.proto.YarnServerFederationProtos.SubClusterPolicyConfigurationProto;
import org.apache.hadoop.yarn.security.client.YARNDelegationTokenIdentifier;
import org.apache.hadoop.yarn.server.federation.store.FederationStateStore;
import org.apache.hadoop.yarn.server.federation.store.records.AddApplicationHomeSubClusterRequest;
import org.apache.hadoop.yarn.server.federation.store.records.AddApplicationHomeSubClusterResponse;
@ -87,14 +93,18 @@
import org.apache.hadoop.yarn.server.federation.store.records.impl.pb.SubClusterIdPBImpl;
import org.apache.hadoop.yarn.server.federation.store.records.impl.pb.SubClusterInfoPBImpl;
import org.apache.hadoop.yarn.server.federation.store.records.impl.pb.SubClusterPolicyConfigurationPBImpl;
import org.apache.hadoop.yarn.server.federation.store.records.RouterMasterKey;
import org.apache.hadoop.yarn.server.federation.store.records.RouterStoreToken;
import org.apache.hadoop.yarn.server.federation.store.utils.FederationApplicationHomeSubClusterStoreInputValidator;
import org.apache.hadoop.yarn.server.federation.store.utils.FederationMembershipStateStoreInputValidator;
import org.apache.hadoop.yarn.server.federation.store.utils.FederationPolicyStoreInputValidator;
import org.apache.hadoop.yarn.server.federation.store.utils.FederationStateStoreUtils;
import org.apache.hadoop.yarn.server.federation.store.utils.FederationReservationHomeSubClusterStoreInputValidator;
import org.apache.hadoop.yarn.server.federation.store.utils.FederationRouterRMTokenInputValidator;
import org.apache.hadoop.yarn.server.records.Version;
import org.apache.hadoop.yarn.api.records.ReservationId;
import org.apache.hadoop.yarn.util.Clock;
import org.apache.hadoop.yarn.util.Records;
import org.apache.hadoop.yarn.util.SystemClock;
import org.apache.zookeeper.data.ACL;
import org.slf4j.Logger;
@ -103,11 +113,14 @@
import org.apache.hadoop.thirdparty.protobuf.InvalidProtocolBufferException;
import static org.apache.hadoop.yarn.server.federation.store.utils.FederationStateStoreUtils.filterHomeSubCluster;
import static org.apache.hadoop.security.token.delegation.ZKDelegationTokenSecretManager.ZK_DTSM_TOKEN_SEQNUM_BATCH_SIZE;
import static org.apache.hadoop.security.token.delegation.ZKDelegationTokenSecretManager.ZK_DTSM_TOKEN_SEQNUM_BATCH_SIZE_DEFAULT;
import static org.apache.hadoop.util.curator.ZKCuratorManager.getNodePath;
/**
* ZooKeeper implementation of {@link FederationStateStore}.
*
* The znode structure is as follows:
*
* ROOT_DIR_PATH
* |--- MEMBERSHIP
* | |----- SC1
@ -121,6 +134,14 @@
* |--- RESERVATION
* | |----- RESERVATION1
* | |----- RESERVATION2
* |--- ROUTER_RM_DT_SECRET_MANAGER_ROOT
* | |----- ROUTER_RM_DELEGATION_TOKENS_ROOT
* | | |----- RM_DELEGATION_TOKEN_1
* | | |----- RM_DELEGATION_TOKEN_2
* | | |----- RM_DELEGATION_TOKEN_3
* | |----- ROUTER_RM_DT_MASTER_KEYS_ROOT
* | | |----- DELEGATION_KEY_1
* | |----- ROUTER_RM_DT_SEQUENTIAL_NUMBER
*/
public class ZookeeperFederationStateStore implements FederationStateStore {
@ -132,9 +153,29 @@ public class ZookeeperFederationStateStore implements FederationStateStore {
private final static String ROOT_ZNODE_NAME_POLICY = "policies";
private final static String ROOT_ZNODE_NAME_RESERVATION = "reservation";
/** Store Delegation Token Node. */
private final static String ROUTER_RM_DT_SECRET_MANAGER_ROOT = "router_rm_dt_secret_manager_root";
private static final String ROUTER_RM_DT_MASTER_KEYS_ROOT_ZNODE_NAME =
"router_rm_dt_master_keys_root";
private static final String ROUTER_RM_DELEGATION_TOKENS_ROOT_ZNODE_NAME =
"router_rm_delegation_tokens_root";
private static final String ROUTER_RM_DT_SEQUENTIAL_NUMBER_ZNODE_NAME =
"router_rm_dt_sequential_number";
private static final String ROUTER_RM_DT_MASTER_KEY_ID_ZNODE_NAME =
"router_rm_dt_master_key_id";
private static final String ROUTER_RM_DELEGATION_KEY_PREFIX = "delegation_key_";
private static final String ROUTER_RM_DELEGATION_TOKEN_PREFIX = "rm_delegation_token_";
/** Interface to Zookeeper. */
private ZKCuratorManager zkManager;
/** Store sequenceNum. **/
private int seqNumBatchSize;
private int currentSeqNum;
private int currentMaxSeqNum;
private SharedCount delTokSeqCounter;
private SharedCount keyIdSeqCounter;
/** Directory to store the state store data. */
private String baseZNode;
@ -144,6 +185,13 @@ public class ZookeeperFederationStateStore implements FederationStateStore {
private String reservationsZNode;
private int maxAppsInStateStore;
/** Directory to store the delegation token data. **/
private String routerRMDTSecretManagerRoot;
private String routerRMDTMasterKeysRootPath;
private String routerRMDelegationTokensRootPath;
private String routerRMSequenceNumberPath;
private String routerRMMasterKeyIdPath;
private volatile Clock clock = SystemClock.getInstance();
@VisibleForTesting
@ -152,6 +200,7 @@ public class ZookeeperFederationStateStore implements FederationStateStore {
@Override
public void init(Configuration conf) throws YarnException {
LOG.info("Initializing ZooKeeper connection");
maxAppsInStateStore = conf.getInt(
@ -174,6 +223,17 @@ public void init(Configuration conf) throws YarnException {
policiesZNode = getNodePath(baseZNode, ROOT_ZNODE_NAME_POLICY);
reservationsZNode = getNodePath(baseZNode, ROOT_ZNODE_NAME_RESERVATION);
// delegation token znodes
routerRMDTSecretManagerRoot = getNodePath(baseZNode, ROUTER_RM_DT_SECRET_MANAGER_ROOT);
routerRMDTMasterKeysRootPath = getNodePath(routerRMDTSecretManagerRoot,
ROUTER_RM_DT_MASTER_KEYS_ROOT_ZNODE_NAME);
routerRMDelegationTokensRootPath = getNodePath(routerRMDTSecretManagerRoot,
ROUTER_RM_DELEGATION_TOKENS_ROOT_ZNODE_NAME);
routerRMSequenceNumberPath = getNodePath(routerRMDTSecretManagerRoot,
ROUTER_RM_DT_SEQUENTIAL_NUMBER_ZNODE_NAME);
routerRMMasterKeyIdPath = getNodePath(routerRMDTSecretManagerRoot,
ROUTER_RM_DT_MASTER_KEY_ID_ZNODE_NAME);
// Create base znode for each entity
try {
List<ACL> zkAcl = ZKCuratorManager.getZKAcls(conf);
@ -181,14 +241,68 @@ public void init(Configuration conf) throws YarnException {
zkManager.createRootDirRecursively(appsZNode, zkAcl);
zkManager.createRootDirRecursively(policiesZNode, zkAcl);
zkManager.createRootDirRecursively(reservationsZNode, zkAcl);
zkManager.createRootDirRecursively(routerRMDTSecretManagerRoot, zkAcl);
zkManager.createRootDirRecursively(routerRMDTMasterKeysRootPath, zkAcl);
zkManager.createRootDirRecursively(routerRMDelegationTokensRootPath, zkAcl);
} catch (Exception e) {
String errMsg = "Cannot create base directories: " + e.getMessage();
FederationStateStoreUtils.logAndThrowStoreException(LOG, errMsg);
}
// Distributed sequenceNum.
try {
seqNumBatchSize = conf.getInt(ZK_DTSM_TOKEN_SEQNUM_BATCH_SIZE,
ZK_DTSM_TOKEN_SEQNUM_BATCH_SIZE_DEFAULT);
delTokSeqCounter = new SharedCount(zkManager.getCurator(), routerRMSequenceNumberPath, 0);
if (delTokSeqCounter != null) {
delTokSeqCounter.start();
}
// the first batch range should be allocated during this starting window
// by calling the incrSharedCount
currentSeqNum = incrSharedCount(delTokSeqCounter, seqNumBatchSize);
currentMaxSeqNum = currentSeqNum + seqNumBatchSize;
LOG.info("Fetched initial range of seq num, from {} to {} ",
currentSeqNum + 1, currentMaxSeqNum);
} catch (Exception e) {
throw new YarnException("Could not start Sequence Counter.", e);
}
// Distributed masterKeyId.
try {
keyIdSeqCounter = new SharedCount(zkManager.getCurator(), routerRMMasterKeyIdPath, 0);
if (keyIdSeqCounter != null) {
keyIdSeqCounter.start();
}
} catch (Exception e) {
throw new YarnException("Could not start Master KeyId Counter", e);
}
}
@Override
public void close() throws Exception {
try {
if (delTokSeqCounter != null) {
delTokSeqCounter.close();
delTokSeqCounter = null;
}
} catch (Exception e) {
LOG.error("Could not Stop Delegation Token Counter.", e);
}
try {
if (keyIdSeqCounter != null) {
keyIdSeqCounter.close();
keyIdSeqCounter = null;
}
} catch (Exception e) {
LOG.error("Could not stop Master KeyId Counter.", e);
}
if (zkManager != null) {
zkManager.close();
}
@ -886,45 +1000,599 @@ public UpdateReservationHomeSubClusterResponse updateReservationHomeSubCluster(
return UpdateReservationHomeSubClusterResponse.newInstance();
}
/**
* ZookeeperFederationStateStore Supports Store NewMasterKey.
*
* @param request The request contains RouterMasterKey, which is an abstraction for DelegationKey
* @return routerMasterKeyResponse, the response contains the RouterMasterKey.
* @throws YarnException if the call to the state store is unsuccessful.
* @throws IOException An IO Error occurred.
*/
@Override
public RouterMasterKeyResponse storeNewMasterKey(RouterMasterKeyRequest request)
throws YarnException, IOException {
throw new NotImplementedException("Code is not implemented");
long start = clock.getTime();
// For the verification of the request, after passing the verification,
// the request and the internal objects will not be empty and can be used directly.
FederationRouterRMTokenInputValidator.validate(request);
// Parse the delegationKey from the request and get the ZK storage path.
DelegationKey delegationKey = convertMasterKeyToDelegationKey(request);
String nodeCreatePath = getMasterKeyZNodePathByDelegationKey(delegationKey);
LOG.debug("Storing RMDelegationKey_{}, ZkNodePath = {}.", delegationKey.getKeyId(),
nodeCreatePath);
// Write master key data to zk.
try (ByteArrayOutputStream os = new ByteArrayOutputStream();
DataOutputStream fsOut = new DataOutputStream(os)) {
delegationKey.write(fsOut);
put(nodeCreatePath, os.toByteArray(), false);
}
// Get the stored masterKey from zk.
RouterMasterKey masterKeyFromZK = getRouterMasterKeyFromZK(nodeCreatePath);
long end = clock.getTime();
opDurations.addStoreNewMasterKeyDuration(start, end);
return RouterMasterKeyResponse.newInstance(masterKeyFromZK);
}
/**
* ZookeeperFederationStateStore Supports Remove MasterKey.
*
* @param request The request contains RouterMasterKey, which is an abstraction for DelegationKey
* @return routerMasterKeyResponse, the response contains the RouterMasterKey.
* @throws YarnException if the call to the state store is unsuccessful.
* @throws IOException An IO Error occurred.
*/
@Override
public RouterMasterKeyResponse removeStoredMasterKey(RouterMasterKeyRequest request)
throws YarnException, IOException {
throw new NotImplementedException("Code is not implemented");
long start = clock.getTime();
// For the verification of the request, after passing the verification,
// the request and the internal objects will not be empty and can be used directly.
FederationRouterRMTokenInputValidator.validate(request);
try {
// Parse the delegationKey from the request and get the ZK storage path.
RouterMasterKey masterKey = request.getRouterMasterKey();
DelegationKey delegationKey = convertMasterKeyToDelegationKey(request);
String nodeRemovePath = getMasterKeyZNodePathByDelegationKey(delegationKey);
LOG.debug("Removing RMDelegationKey_{}, ZkNodePath = {}.", delegationKey.getKeyId(),
nodeRemovePath);
// Check if the path exists, Throws an exception if the path does not exist.
if (!exists(nodeRemovePath)) {
throw new YarnException("ZkNodePath = " + nodeRemovePath + " not exists!");
}
// try to remove masterKey.
zkManager.delete(nodeRemovePath);
long end = clock.getTime();
opDurations.removeStoredMasterKeyDuration(start, end);
return RouterMasterKeyResponse.newInstance(masterKey);
} catch (Exception e) {
throw new YarnException(e);
}
}
/**
* ZookeeperFederationStateStore Supports Remove MasterKey.
*
* @param request The request contains RouterMasterKey, which is an abstraction for DelegationKey
* @return routerMasterKeyResponse, the response contains the RouterMasterKey.
* @throws YarnException if the call to the state store is unsuccessful.
* @throws IOException An IO Error occurred.
*/
@Override
public RouterMasterKeyResponse getMasterKeyByDelegationKey(RouterMasterKeyRequest request)
throws YarnException, IOException {
throw new NotImplementedException("Code is not implemented");
long start = clock.getTime();
// For the verification of the request, after passing the verification,
// the request and the internal objects will not be empty and can be used directly.
FederationRouterRMTokenInputValidator.validate(request);
try {
// Parse the delegationKey from the request and get the ZK storage path.
DelegationKey delegationKey = convertMasterKeyToDelegationKey(request);
String nodePath = getMasterKeyZNodePathByDelegationKey(delegationKey);
// Check if the path exists, Throws an exception if the path does not exist.
if (!exists(nodePath)) {
throw new YarnException("ZkNodePath = " + nodePath + " not exists!");
}
// Get the stored masterKey from zk.
RouterMasterKey routerMasterKey = getRouterMasterKeyFromZK(nodePath);
long end = clock.getTime();
opDurations.getMasterKeyByDelegationKeyDuration(start, end);
return RouterMasterKeyResponse.newInstance(routerMasterKey);
} catch (Exception e) {
throw new YarnException(e);
}
}
/**
* Get MasterKeyZNodePath based on DelegationKey.
*
* @param delegationKey delegationKey.
* @return masterKey ZNodePath.
*/
private String getMasterKeyZNodePathByDelegationKey(DelegationKey delegationKey) {
return getMasterKeyZNodePathByKeyId(delegationKey.getKeyId());
}
/**
* Get MasterKeyZNodePath based on KeyId.
*
* @param keyId master key id.
* @return masterKey ZNodePath.
*/
private String getMasterKeyZNodePathByKeyId(int keyId) {
String nodeName = ROUTER_RM_DELEGATION_KEY_PREFIX + keyId;
return getNodePath(routerRMDTMasterKeysRootPath, nodeName);
}
/**
* Get RouterMasterKey from ZK.
*
* @param nodePath The path where masterKey is stored in zk.
*
* @return RouterMasterKey.
* @throws IOException An IO Error occurred.
*/
private RouterMasterKey getRouterMasterKeyFromZK(String nodePath)
throws IOException {
try {
byte[] data = get(nodePath);
if ((data == null) || (data.length == 0)) {
return null;
}
ByteArrayInputStream bin = new ByteArrayInputStream(data);
DataInputStream din = new DataInputStream(bin);
DelegationKey key = new DelegationKey();
key.readFields(din);
return RouterMasterKey.newInstance(key.getKeyId(),
ByteBuffer.wrap(key.getEncodedKey()), key.getExpiryDate());
} catch (Exception ex) {
LOG.error("No node in path {}.", nodePath);
throw new IOException(ex);
}
}
/**
* ZookeeperFederationStateStore Supports Store RMDelegationTokenIdentifier.
*
* The stored token method is a synchronized method
* used to ensure that storeNewToken is a thread-safe method.
*
* @param request The request contains RouterRMToken (RMDelegationTokenIdentifier and renewDate)
* @return routerRMTokenResponse, the response contains the RouterStoreToken.
* @throws YarnException if the call to the state store is unsuccessful.
* @throws IOException An IO Error occurred.
*/
@Override
public RouterRMTokenResponse storeNewToken(RouterRMTokenRequest request)
throws YarnException, IOException {
throw new NotImplementedException("Code is not implemented");
long start = clock.getTime();
// We verify the RouterRMTokenRequest to ensure that the request is not empty,
// and that the internal RouterStoreToken is not empty.
FederationRouterRMTokenInputValidator.validate(request);
try {
// add delegationToken
storeOrUpdateRouterRMDT(request, false);
// Get the stored delegationToken from ZK and return.
RouterStoreToken resultStoreToken = getStoreTokenFromZK(request);
long end = clock.getTime();
opDurations.getStoreNewTokenDuration(start, end);
return RouterRMTokenResponse.newInstance(resultStoreToken);
} catch (YarnException | IOException e) {
throw e;
} catch (Exception e) {
throw new YarnException(e);
}
}
/**
* ZookeeperFederationStateStore Supports Update RMDelegationTokenIdentifier.
*
* The update stored token method is a synchronized method
* used to ensure that storeNewToken is a thread-safe method.
*
* @param request The request contains RouterRMToken (RMDelegationTokenIdentifier and renewDate)
* @return routerRMTokenResponse, the response contains the RouterStoreToken.
* @throws YarnException if the call to the state store is unsuccessful.
* @throws IOException An IO Error occurred.
*/
@Override
public RouterRMTokenResponse updateStoredToken(RouterRMTokenRequest request)
throws YarnException, IOException {
throw new NotImplementedException("Code is not implemented");
long start = clock.getTime();
// We verify the RouterRMTokenRequest to ensure that the request is not empty,
// and that the internal RouterStoreToken is not empty.
FederationRouterRMTokenInputValidator.validate(request);
try {
// get the Token storage path
String nodePath = getStoreTokenZNodePathByTokenRequest(request);
// updateStoredToken needs to determine whether the zkNode exists.
// If it exists, update the token data.
// If it does not exist, write the new token data directly.
boolean pathExists = true;
if (!exists(nodePath)) {
pathExists = false;
}
if (pathExists) {
// update delegationToken
storeOrUpdateRouterRMDT(request, true);
} else {
// add new delegationToken
storeNewToken(request);
}
// Get the stored delegationToken from ZK and return.
RouterStoreToken resultStoreToken = getStoreTokenFromZK(request);
long end = clock.getTime();
opDurations.updateStoredTokenDuration(start, end);
return RouterRMTokenResponse.newInstance(resultStoreToken);
} catch (YarnException | IOException e) {
throw e;
} catch (Exception e) {
throw new YarnException(e);
}
}
/**
* ZookeeperFederationStateStore Supports Remove RMDelegationTokenIdentifier.
*
* The remove stored token method is a synchronized method
* used to ensure that storeNewToken is a thread-safe method.
*
* @param request The request contains RouterRMToken (RMDelegationTokenIdentifier and renewDate)
* @return routerRMTokenResponse, the response contains the RouterStoreToken.
* @throws YarnException if the call to the state store is unsuccessful.
* @throws IOException An IO Error occurred.
*/
@Override
public RouterRMTokenResponse removeStoredToken(RouterRMTokenRequest request)
throws YarnException, IOException {
throw new NotImplementedException("Code is not implemented");
long start = clock.getTime();
// We verify the RouterRMTokenRequest to ensure that the request is not empty,
// and that the internal RouterStoreToken is not empty.
FederationRouterRMTokenInputValidator.validate(request);
try {
// get the Token storage path
String nodePath = getStoreTokenZNodePathByTokenRequest(request);
// If the path to be deleted does not exist, throw an exception directly.
if (!exists(nodePath)) {
throw new YarnException("ZkNodePath = " + nodePath + " not exists!");
}
// Check again, first get the data from ZK,
// if the data is not empty, then delete it
RouterStoreToken storeToken = getStoreTokenFromZK(request);
if (storeToken != null) {
zkManager.delete(nodePath);
}
// return deleted token data.
long end = clock.getTime();
opDurations.removeStoredTokenDuration(start, end);
return RouterRMTokenResponse.newInstance(storeToken);
} catch (YarnException | IOException e) {
throw e;
} catch (Exception e) {
throw new YarnException(e);
}
}
/**
* The Router Supports GetTokenByRouterStoreToken.
*
* @param request The request contains RouterRMToken (RMDelegationTokenIdentifier and renewDate)
* @return RouterRMTokenResponse.
* @throws YarnException if the call to the state store is unsuccessful
* @throws IOException An IO Error occurred
*/
@Override
public RouterRMTokenResponse getTokenByRouterStoreToken(RouterRMTokenRequest request)
throws YarnException, IOException {
throw new NotImplementedException("Code is not implemented");
long start = clock.getTime();
// We verify the RouterRMTokenRequest to ensure that the request is not empty,
// and that the internal RouterStoreToken is not empty.
FederationRouterRMTokenInputValidator.validate(request);
try {
// Before get the token,
// we need to determine whether the path where the token is stored exists.
// If it doesn't exist, we will throw an exception.
String nodePath = getStoreTokenZNodePathByTokenRequest(request);
if (!exists(nodePath)) {
throw new YarnException("ZkNodePath = " + nodePath + " not exists!");
}
// Get the stored delegationToken from ZK and return.
RouterStoreToken resultStoreToken = getStoreTokenFromZK(request);
// return deleted token data.
long end = clock.getTime();
opDurations.getTokenByRouterStoreTokenDuration(start, end);
return RouterRMTokenResponse.newInstance(resultStoreToken);
} catch (YarnException | IOException e) {
throw e;
} catch (Exception e) {
throw new YarnException(e);
}
}
/**
* Convert MasterKey to DelegationKey.
*
* Before using this function,
* please use FederationRouterRMTokenInputValidator to verify the request.
* By default, the request is not empty, and the internal object is not empty.
*
* @param request RouterMasterKeyRequest
* @return DelegationKey.
*/
private DelegationKey convertMasterKeyToDelegationKey(RouterMasterKeyRequest request) {
RouterMasterKey masterKey = request.getRouterMasterKey();
return convertMasterKeyToDelegationKey(masterKey);
}
/**
* Convert MasterKey to DelegationKey.
*
* @param masterKey masterKey.
* @return DelegationKey.
*/
private DelegationKey convertMasterKeyToDelegationKey(RouterMasterKey masterKey) {
ByteBuffer keyByteBuf = masterKey.getKeyBytes();
byte[] keyBytes = new byte[keyByteBuf.remaining()];
keyByteBuf.get(keyBytes);
return new DelegationKey(masterKey.getKeyId(), masterKey.getExpiryDate(), keyBytes);
}
/**
* Check if a path exists in zk.
*
* @param path Path to be checked.
* @return Returns true if the path exists, false if the path does not exist.
* @throws Exception When an exception to access zk occurs.
*/
@VisibleForTesting
boolean exists(final String path) throws Exception {
return zkManager.exists(path);
}
/**
* Add or update delegationToken.
*
* Before using this function,
* please use FederationRouterRMTokenInputValidator to verify the request.
* By default, the request is not empty, and the internal object is not empty.
*
* @param request storeToken
* @param isUpdate true, update the token; false, create a new token.
* @throws Exception exception occurs.
*/
private void storeOrUpdateRouterRMDT(RouterRMTokenRequest request, boolean isUpdate)
throws Exception {
RouterStoreToken routerStoreToken = request.getRouterStoreToken();
String nodeCreatePath = getStoreTokenZNodePathByTokenRequest(request);
LOG.debug("nodeCreatePath = {}, isUpdate = {}", nodeCreatePath, isUpdate);
put(nodeCreatePath, routerStoreToken.toByteArray(), isUpdate);
}
/**
* Get ZNode Path of StoreToken.
*
* Before using this method, we should use FederationRouterRMTokenInputValidator
* to verify the request,ensure that the request is not empty,
* and ensure that the object in the request is not empty.
*
* @param request RouterMasterKeyRequest.
* @return RouterRMToken ZNode Path.
* @throws IOException io exception occurs.
*/
private String getStoreTokenZNodePathByTokenRequest(RouterRMTokenRequest request)
throws IOException {
RouterStoreToken routerStoreToken = request.getRouterStoreToken();
YARNDelegationTokenIdentifier identifier = routerStoreToken.getTokenIdentifier();
return getStoreTokenZNodePathByIdentifier(identifier);
}
/**
* Get ZNode Path of StoreToken.
*
* @param identifier YARNDelegationTokenIdentifier
* @return RouterRMToken ZNode Path.
*/
private String getStoreTokenZNodePathByIdentifier(YARNDelegationTokenIdentifier identifier) {
String nodePath = getNodePath(routerRMDelegationTokensRootPath,
ROUTER_RM_DELEGATION_TOKEN_PREFIX + identifier.getSequenceNumber());
return nodePath;
}
/**
* Get RouterStoreToken from ZK.
*
* @param request RouterMasterKeyRequest.
* @return RouterStoreToken.
* @throws IOException io exception occurs.
*/
private RouterStoreToken getStoreTokenFromZK(RouterRMTokenRequest request) throws IOException {
RouterStoreToken routerStoreToken = request.getRouterStoreToken();
YARNDelegationTokenIdentifier identifier = routerStoreToken.getTokenIdentifier();
return getStoreTokenFromZK(identifier);
}
/**
* Get RouterStoreToken from ZK.
*
* @param identifier YARN DelegationToken Identifier.
* @return RouterStoreToken.
* @throws IOException io exception occurs.
*/
private RouterStoreToken getStoreTokenFromZK(YARNDelegationTokenIdentifier identifier)
throws IOException {
// get the Token storage path
String nodePath = getStoreTokenZNodePathByIdentifier(identifier);
return getStoreTokenFromZK(nodePath);
}
/**
* Get RouterStoreToken from ZK.
*
* @param nodePath Znode location where data is stored.
* @return RouterStoreToken.
* @throws IOException io exception occurs.
*/
private RouterStoreToken getStoreTokenFromZK(String nodePath)
throws IOException {
try {
byte[] data = get(nodePath);
if ((data == null) || (data.length == 0)) {
return null;
}
ByteArrayInputStream bin = new ByteArrayInputStream(data);
DataInputStream din = new DataInputStream(bin);
RouterStoreToken storeToken = Records.newRecord(RouterStoreToken.class);
storeToken.readFields(din);
return storeToken;
} catch (Exception ex) {
LOG.error("No node in path [{}]", nodePath, ex);
throw new IOException(ex);
}
}
/**
* Increase SequenceNum. For zk, this is a distributed value.
* To ensure data consistency, we will use the synchronized keyword.
*
* For ZookeeperFederationStateStore, in order to reduce the interaction with ZK,
* we will apply for SequenceNum from ZK in batches(Apply
* when currentSeqNum &gt;= currentMaxSeqNum),
* and assign this value to the variable currentMaxSeqNum.
*
* When calling the method incrementDelegationTokenSeqNum,
* if currentSeqNum &lt; currentMaxSeqNum, we return ++currentMaxSeqNum,
* When currentSeqNum &gt;= currentMaxSeqNum, we re-apply SequenceNum from zk.
*
* @return SequenceNum.
*/
@Override
public int incrementDelegationTokenSeqNum() {
// The secret manager will keep a local range of seq num which won't be
// seen by peers, so only when the range is exhausted it will ask zk for
// another range again
if (currentSeqNum >= currentMaxSeqNum) {
try {
// after a successful batch request, we can get the range starting point
currentSeqNum = incrSharedCount(delTokSeqCounter, seqNumBatchSize);
currentMaxSeqNum = currentSeqNum + seqNumBatchSize;
LOG.info("Fetched new range of seq num, from {} to {} ",
currentSeqNum + 1, currentMaxSeqNum);
} catch (InterruptedException e) {
// The ExpirationThread is just finishing.. so don't do anything..
LOG.debug("Thread interrupted while performing token counter increment", e);
Thread.currentThread().interrupt();
} catch (Exception e) {
throw new RuntimeException("Could not increment shared counter !!", e);
}
}
return ++currentSeqNum;
}
/**
* Increment the value of the shared variable.
*
* @param sharedCount zk SharedCount.
* @param batchSize batch size.
* @return new SequenceNum.
* @throws Exception exception occurs.
*/
private int incrSharedCount(SharedCount sharedCount, int batchSize)
throws Exception {
while (true) {
// Loop until we successfully increment the counter
VersionedValue<Integer> versionedValue = sharedCount.getVersionedValue();
if (sharedCount.trySetCount(versionedValue, versionedValue.getValue() + batchSize)) {
return versionedValue.getValue();
}
}
}
/**
* Get DelegationToken SeqNum.
*
* @return delegationTokenSeqNum.
*/
@Override
public int getDelegationTokenSeqNum() {
return delTokSeqCounter.getCount();
}
/**
* Set DelegationToken SeqNum.
*
* @param seqNum sequenceNum.
*/
@Override
public void setDelegationTokenSeqNum(int seqNum) {
try {
delTokSeqCounter.setCount(seqNum);
} catch (Exception e) {
throw new RuntimeException("Could not set shared counter !!", e);
}
}
/**
* Get Current KeyId.
*
* @return currentKeyId.
*/
@Override
public int getCurrentKeyId() {
return keyIdSeqCounter.getCount();
}
/**
* The Router Supports incrementCurrentKeyId.
*
* @return CurrentKeyId.
*/
@Override
public int incrementCurrentKeyId() {
try {
// It should be noted that the BatchSize of MasterKeyId defaults to 1.
incrSharedCount(keyIdSeqCounter, 1);
} catch (InterruptedException e) {
// The ExpirationThread is just finishing.. so don't do anything..
LOG.debug("Thread interrupted while performing Master keyId increment", e);
Thread.currentThread().interrupt();
} catch (Exception e) {
throw new RuntimeException("Could not increment shared Master keyId counter !!", e);
}
return keyIdSeqCounter.getCount();
}
}

View File

@ -22,6 +22,7 @@
import org.apache.hadoop.yarn.security.client.YARNDelegationTokenIdentifier;
import org.apache.hadoop.yarn.util.Records;
import java.io.DataInput;
import java.io.IOException;
@Private
@ -53,4 +54,12 @@ public static RouterStoreToken newInstance(YARNDelegationTokenIdentifier identif
@Private
@Unstable
public abstract void setRenewDate(Long renewDate);
@Private
@Unstable
public abstract byte[] toByteArray() throws IOException;
@Private
@Unstable
public abstract void readFields(DataInput in) throws IOException;
}

View File

@ -28,6 +28,7 @@
import org.apache.hadoop.yarn.federation.proto.YarnServerFederationProtos.RouterStoreTokenProtoOrBuilder;
import java.io.ByteArrayInputStream;
import java.io.DataInput;
import java.io.DataInputStream;
import java.io.IOException;
@ -168,4 +169,12 @@ private YARNDelegationTokenIdentifierProto convertToProtoFormat(
YARNDelegationTokenIdentifier delegationTokenIdentifier) {
return delegationTokenIdentifier.getProto();
}
public byte[] toByteArray() throws IOException {
return builder.build().toByteArray();
}
public void readFields(DataInput in) throws IOException {
builder.mergeFrom((DataInputStream) in);
}
}

View File

@ -0,0 +1,105 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.server.federation.store.utils;
import org.apache.hadoop.yarn.security.client.YARNDelegationTokenIdentifier;
import org.apache.hadoop.yarn.server.federation.store.exception.FederationStateStoreInvalidInputException;
import org.apache.hadoop.yarn.server.federation.store.records.RouterRMTokenRequest;
import org.apache.hadoop.yarn.server.federation.store.records.RouterStoreToken;
import org.apache.hadoop.yarn.server.federation.store.records.RouterMasterKeyRequest;
import org.apache.hadoop.yarn.server.federation.store.records.RouterMasterKey;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public final class FederationRouterRMTokenInputValidator {
private static final Logger LOG =
LoggerFactory.getLogger(FederationRouterRMTokenInputValidator.class);
private FederationRouterRMTokenInputValidator() {
}
/**
* We will check with the RouterRMTokenRequest{@link RouterRMTokenRequest}
* to ensure that the request object is not empty and that the RouterStoreToken is not empty.
*
* @param request RouterRMTokenRequest Request.
* @throws FederationStateStoreInvalidInputException if the request is invalid.
*/
public static void validate(RouterRMTokenRequest request)
throws FederationStateStoreInvalidInputException {
if (request == null) {
String message = "Missing RouterRMToken Request."
+ " Please try again by specifying a router rm token information.";
LOG.warn(message);
throw new FederationStateStoreInvalidInputException(message);
}
RouterStoreToken storeToken = request.getRouterStoreToken();
if (storeToken == null) {
String message = "Missing RouterStoreToken."
+ " Please try again by specifying a router rm token information.";
LOG.warn(message);
throw new FederationStateStoreInvalidInputException(message);
}
try {
YARNDelegationTokenIdentifier identifier = storeToken.getTokenIdentifier();
if (identifier == null) {
String message = "Missing YARNDelegationTokenIdentifier."
+ " Please try again by specifying a router rm token information.";
LOG.warn(message);
throw new FederationStateStoreInvalidInputException(message);
}
} catch (Exception e) {
throw new FederationStateStoreInvalidInputException(e);
}
}
/**
* We will check with the RouterMasterKeyRequest{@link RouterMasterKeyRequest}
* to ensure that the request object is not empty and that the RouterMasterKey is not empty.
*
* @param request RouterMasterKey Request.
* @throws FederationStateStoreInvalidInputException if the request is invalid.
*/
public static void validate(RouterMasterKeyRequest request)
throws FederationStateStoreInvalidInputException {
// Verify the request to ensure that the request is not empty,
// if the request is found to be empty, an exception will be thrown.
if (request == null) {
String message = "Missing RouterMasterKey Request."
+ " Please try again by specifying a router master key request information.";
LOG.warn(message);
throw new FederationStateStoreInvalidInputException(message);
}
// Check whether the masterKey is empty,
// if the masterKey is empty, throw an exception message.
RouterMasterKey masterKey = request.getRouterMasterKey();
if (masterKey == null) {
String message = "Missing RouterMasterKey."
+ " Please try again by specifying a router master key information.";
LOG.warn(message);
throw new FederationStateStoreInvalidInputException(message);
}
}
}

View File

@ -504,7 +504,7 @@ public void deleteReservationHomeSubCluster(ReservationId reservationId) throws
* @param defaultValue the default implementation for fallback
* @param type the class for which a retry proxy is required
* @param retryPolicy the policy for retrying method call failures
* @param <T> The type of the instance
* @param <T> The type of the instance.
* @return a retry proxy for the specified interface
*/
public static <T> Object createRetryInstance(Configuration conf,
@ -859,6 +859,51 @@ public RouterRMTokenResponse getTokenByRouterStoreToken(RMDelegationTokenIdentif
return stateStore.getTokenByRouterStoreToken(request);
}
/**
* stateStore provides DelegationTokenSeqNum increase.
*
* @return delegationTokenSequenceNumber.
*/
public int incrementDelegationTokenSeqNum() {
return stateStore.incrementDelegationTokenSeqNum();
}
/**
* Get SeqNum from stateStore.
*
* @return delegationTokenSequenceNumber.
*/
public int getDelegationTokenSeqNum() {
return stateStore.getDelegationTokenSeqNum();
}
/**
* Set SeqNum from stateStore.
*
* @param seqNum delegationTokenSequenceNumber.
*/
public void setDelegationTokenSeqNum(int seqNum) {
stateStore.setDelegationTokenSeqNum(seqNum);
}
/**
* Get CurrentKeyId from stateStore.
*
* @return currentKeyId.
*/
public int getCurrentKeyId() {
return stateStore.getCurrentKeyId();
}
/**
* stateStore provides CurrentKeyId increase.
*
* @return currentKeyId.
*/
public int incrementCurrentKeyId() {
return stateStore.incrementCurrentKeyId();
}
/**
* Get the number of active cluster nodes.
*

View File

@ -97,6 +97,12 @@ public abstract class FederationStateStoreBaseTest {
protected abstract FederationStateStore createStateStore();
protected abstract void checkRouterMasterKey(DelegationKey delegationKey,
RouterMasterKey routerMasterKey) throws YarnException, IOException;
protected abstract void checkRouterStoreToken(RMDelegationTokenIdentifier identifier,
RouterStoreToken token) throws YarnException, IOException;
private Configuration conf;
@Before
@ -876,6 +882,8 @@ public void testStoreNewMasterKey() throws Exception {
Assert.assertEquals(routerMasterKey.getKeyId(), routerMasterKeyResp.getKeyId());
Assert.assertEquals(routerMasterKey.getKeyBytes(), routerMasterKeyResp.getKeyBytes());
Assert.assertEquals(routerMasterKey.getExpiryDate(), routerMasterKeyResp.getExpiryDate());
checkRouterMasterKey(key, routerMasterKey);
}
@Test
@ -949,6 +957,9 @@ public void testStoreNewToken() throws IOException, YarnException {
Assert.assertNotNull(storeTokenResp);
Assert.assertEquals(storeToken.getRenewDate(), storeTokenResp.getRenewDate());
Assert.assertEquals(storeToken.getTokenIdentifier(), storeTokenResp.getTokenIdentifier());
checkRouterStoreToken(identifier, storeToken);
checkRouterStoreToken(identifier, storeTokenResp);
}
@Test
@ -981,6 +992,8 @@ public void testUpdateStoredToken() throws IOException, YarnException {
Assert.assertNotNull(updateTokenResp);
Assert.assertEquals(updateToken.getRenewDate(), updateTokenResp.getRenewDate());
Assert.assertEquals(updateToken.getTokenIdentifier(), updateTokenResp.getTokenIdentifier());
checkRouterStoreToken(identifier, updateTokenResp);
}
@Test
@ -1029,5 +1042,7 @@ public void testGetTokenByRouterStoreToken() throws IOException, YarnException {
Assert.assertNotNull(getStoreTokenResp);
Assert.assertEquals(getStoreTokenResp.getRenewDate(), storeToken.getRenewDate());
Assert.assertEquals(getStoreTokenResp.getTokenIdentifier(), storeToken.getTokenIdentifier());
checkRouterStoreToken(identifier, getStoreTokenResp);
}
}

View File

@ -18,14 +18,29 @@
package org.apache.hadoop.yarn.server.federation.store.impl;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.security.token.delegation.DelegationKey;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.security.client.RMDelegationTokenIdentifier;
import org.apache.hadoop.yarn.security.client.YARNDelegationTokenIdentifier;
import org.apache.hadoop.yarn.server.federation.store.FederationStateStore;
import org.apache.hadoop.yarn.server.federation.store.records.RouterMasterKey;
import org.apache.hadoop.yarn.server.federation.store.records.RouterRMDTSecretManagerState;
import org.apache.hadoop.yarn.server.federation.store.records.RouterStoreToken;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.Map;
import java.util.Set;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.assertEquals;
/**
* Unit tests for MemoryFederationStateStore.
*/
public class TestMemoryFederationStateStore
extends FederationStateStoreBaseTest {
public class TestMemoryFederationStateStore extends FederationStateStoreBaseTest {
@Override
protected FederationStateStore createStateStore() {
@ -34,4 +49,43 @@ protected FederationStateStore createStateStore() {
super.setConf(conf);
return new MemoryFederationStateStore();
}
@Override
protected void checkRouterMasterKey(DelegationKey delegationKey,
RouterMasterKey routerMasterKey) throws YarnException, IOException {
MemoryFederationStateStore memoryStateStore =
MemoryFederationStateStore.class.cast(this.getStateStore());
RouterRMDTSecretManagerState secretManagerState =
memoryStateStore.getRouterRMSecretManagerState();
assertNotNull(secretManagerState);
Set<DelegationKey> delegationKeys = secretManagerState.getMasterKeyState();
assertNotNull(delegationKeys);
assertTrue(delegationKeys.contains(delegationKey));
RouterMasterKey resultRouterMasterKey = RouterMasterKey.newInstance(delegationKey.getKeyId(),
ByteBuffer.wrap(delegationKey.getEncodedKey()), delegationKey.getExpiryDate());
assertEquals(resultRouterMasterKey, routerMasterKey);
}
@Override
protected void checkRouterStoreToken(RMDelegationTokenIdentifier identifier,
RouterStoreToken token) throws YarnException, IOException {
MemoryFederationStateStore memoryStateStore =
MemoryFederationStateStore.class.cast(this.getStateStore());
RouterRMDTSecretManagerState secretManagerState =
memoryStateStore.getRouterRMSecretManagerState();
assertNotNull(secretManagerState);
Map<RMDelegationTokenIdentifier, Long> tokenStateMap =
secretManagerState.getTokenState();
assertNotNull(tokenStateMap);
assertTrue(tokenStateMap.containsKey(identifier));
YARNDelegationTokenIdentifier tokenIdentifier = token.getTokenIdentifier();
assertTrue(tokenIdentifier instanceof RMDelegationTokenIdentifier);
assertEquals(identifier, tokenIdentifier);
}
}

View File

@ -18,12 +18,14 @@
package org.apache.hadoop.yarn.server.federation.store.impl;
import org.apache.commons.lang3.NotImplementedException;
import org.apache.hadoop.security.token.delegation.DelegationKey;
import org.apache.hadoop.test.LambdaTestUtils;
import org.apache.hadoop.util.Time;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.ReservationId;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.security.client.RMDelegationTokenIdentifier;
import org.apache.hadoop.yarn.server.federation.store.FederationStateStore;
import org.apache.hadoop.yarn.server.federation.store.metrics.FederationStateStoreClientMetrics;
import org.apache.hadoop.yarn.server.federation.store.records.SubClusterId;
@ -33,6 +35,8 @@
import org.apache.hadoop.yarn.server.federation.store.records.AddReservationHomeSubClusterRequest;
import org.apache.hadoop.yarn.server.federation.store.records.UpdateReservationHomeSubClusterRequest;
import org.apache.hadoop.yarn.server.federation.store.records.DeleteReservationHomeSubClusterRequest;
import org.apache.hadoop.yarn.server.federation.store.records.RouterMasterKey;
import org.apache.hadoop.yarn.server.federation.store.records.RouterStoreToken;
import org.apache.hadoop.yarn.server.federation.store.utils.FederationStateStoreUtils;
import org.junit.Assert;
import org.junit.Test;
@ -592,4 +596,18 @@ public void testRemoveStoredToken() throws IOException, YarnException {
public void testGetTokenByRouterStoreToken() throws IOException, YarnException {
super.testGetTokenByRouterStoreToken();
}
@Override
protected void checkRouterMasterKey(DelegationKey delegationKey,
RouterMasterKey routerMasterKey) throws YarnException, IOException {
// TODO: This part of the code will be completed in YARN-11349 and
// will be used to verify whether the RouterMasterKey stored in the DB is as expected.
}
@Override
protected void checkRouterStoreToken(RMDelegationTokenIdentifier identifier,
RouterStoreToken token) throws YarnException, IOException {
// TODO: This part of the code will be completed in YARN-11349 and
// will be used to verify whether the RouterStoreToken stored in the DB is as expected.
}
}

View File

@ -17,9 +17,11 @@
package org.apache.hadoop.yarn.server.federation.store.impl;
import java.io.ByteArrayInputStream;
import java.io.DataInputStream;
import java.io.IOException;
import java.nio.ByteBuffer;
import org.apache.commons.lang3.NotImplementedException;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.retry.RetryNTimes;
@ -29,27 +31,52 @@
import org.apache.hadoop.metrics2.MetricsRecord;
import org.apache.hadoop.metrics2.impl.MetricsCollectorImpl;
import org.apache.hadoop.metrics2.impl.MetricsRecords;
import org.apache.hadoop.security.token.delegation.DelegationKey;
import org.apache.hadoop.util.Time;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.security.client.RMDelegationTokenIdentifier;
import org.apache.hadoop.yarn.server.federation.store.FederationStateStore;
import org.apache.hadoop.yarn.server.federation.store.records.RouterMasterKey;
import org.apache.hadoop.yarn.server.federation.store.records.RouterMasterKeyRequest;
import org.apache.hadoop.yarn.server.federation.store.records.RouterMasterKeyResponse;
import org.apache.hadoop.yarn.server.federation.store.records.RouterStoreToken;
import org.apache.hadoop.yarn.util.Records;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import static org.apache.hadoop.util.curator.ZKCuratorManager.getNodePath;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.assertNotNull;
/**
* Unit tests for ZookeeperFederationStateStore.
*/
public class TestZookeeperFederationStateStore
extends FederationStateStoreBaseTest {
public class TestZookeeperFederationStateStore extends FederationStateStoreBaseTest {
private static final Logger LOG =
LoggerFactory.getLogger(TestZookeeperFederationStateStore.class);
private static final String ZNODE_FEDERATIONSTORE =
"/federationstore";
private static final String ZNODE_ROUTER_RM_DT_SECRET_MANAGER_ROOT =
"/router_rm_dt_secret_manager_root";
private static final String ZNODE_ROUTER_RM_DELEGATION_TOKENS_ROOT_ZNODE_NAME =
"/router_rm_delegation_tokens_root";
private static final String ZNODE_ROUTER_RM_DT_MASTER_KEYS_ROOT_ZNODE_NAME =
"/router_rm_dt_master_keys_root/";
private static final String ROUTER_RM_DELEGATION_TOKEN_PREFIX = "rm_delegation_token_";
private static final String ROUTER_RM_DELEGATION_KEY_PREFIX = "delegation_key_";
private static final String ZNODE_DT_PREFIX = ZNODE_FEDERATIONSTORE +
ZNODE_ROUTER_RM_DT_SECRET_MANAGER_ROOT + ZNODE_ROUTER_RM_DELEGATION_TOKENS_ROOT_ZNODE_NAME;
private static final String ZNODE_MASTER_KEY_PREFIX = ZNODE_FEDERATIONSTORE +
ZNODE_ROUTER_RM_DT_SECRET_MANAGER_ROOT + ZNODE_ROUTER_RM_DT_MASTER_KEYS_ROOT_ZNODE_NAME;
/** Zookeeper test server. */
private static TestingServer curatorTestingServer;
private static CuratorFramework curatorFramework;
@ -171,38 +198,82 @@ public void testMetricsInited() throws Exception {
MetricsRecords.assertMetric(record, "UpdateReservationHomeSubClusterNumOps", expectOps);
}
@Test(expected = NotImplementedException.class)
public void testStoreNewMasterKey() throws Exception {
super.testStoreNewMasterKey();
private RouterStoreToken getStoreTokenFromZK(String nodePath)
throws YarnException {
try {
byte[] data = curatorFramework.getData().forPath(nodePath);
if ((data == null) || (data.length == 0)) {
return null;
}
ByteArrayInputStream bin = new ByteArrayInputStream(data);
DataInputStream din = new DataInputStream(bin);
RouterStoreToken storeToken = Records.newRecord(RouterStoreToken.class);
storeToken.readFields(din);
return storeToken;
} catch (Exception e) {
throw new YarnException(e);
}
}
@Test(expected = NotImplementedException.class)
public void testGetMasterKeyByDelegationKey() throws YarnException, IOException {
super.testGetMasterKeyByDelegationKey();
private RouterMasterKey getRouterMasterKeyFromZK(String nodePath) throws YarnException {
try {
byte[] data = curatorFramework.getData().forPath(nodePath);
ByteArrayInputStream bin = new ByteArrayInputStream(data);
DataInputStream din = new DataInputStream(bin);
DelegationKey zkDT = new DelegationKey();
zkDT.readFields(din);
RouterMasterKey zkRouterMasterKey = RouterMasterKey.newInstance(
zkDT.getKeyId(), ByteBuffer.wrap(zkDT.getEncodedKey()), zkDT.getExpiryDate());
return zkRouterMasterKey;
} catch (Exception e) {
throw new YarnException(e);
}
}
@Test(expected = NotImplementedException.class)
public void testRemoveStoredMasterKey() throws YarnException, IOException {
super.testRemoveStoredMasterKey();
private boolean isExists(String path) throws YarnException {
try {
return (curatorFramework.checkExists().forPath(path) != null);
} catch (Exception e) {
throw new YarnException(e);
}
}
@Test(expected = NotImplementedException.class)
public void testStoreNewToken() throws IOException, YarnException {
super.testStoreNewToken();
protected void checkRouterMasterKey(DelegationKey delegationKey,
RouterMasterKey routerMasterKey) throws YarnException, IOException {
// Check for MasterKey stored in ZK
RouterMasterKeyRequest routerMasterKeyRequest =
RouterMasterKeyRequest.newInstance(routerMasterKey);
// Get Data From zk.
String nodeName = ROUTER_RM_DELEGATION_KEY_PREFIX + delegationKey.getKeyId();
String nodePath = ZNODE_MASTER_KEY_PREFIX + nodeName;
RouterMasterKey zkRouterMasterKey = getRouterMasterKeyFromZK(nodePath);
// Call the getMasterKeyByDelegationKey interface to get the returned result.
// The zk data should be consistent with the returned data.
RouterMasterKeyResponse response = getStateStore().
getMasterKeyByDelegationKey(routerMasterKeyRequest);
assertNotNull(response);
RouterMasterKey respRouterMasterKey = response.getRouterMasterKey();
assertEquals(routerMasterKey, respRouterMasterKey);
assertEquals(routerMasterKey, zkRouterMasterKey);
assertEquals(zkRouterMasterKey, respRouterMasterKey);
}
@Test(expected = NotImplementedException.class)
public void testUpdateStoredToken() throws IOException, YarnException {
super.testUpdateStoredToken();
}
protected void checkRouterStoreToken(RMDelegationTokenIdentifier identifier,
RouterStoreToken token) throws YarnException, IOException {
// Get delegationToken Path
String nodeName = ROUTER_RM_DELEGATION_TOKEN_PREFIX + identifier.getSequenceNumber();
String nodePath = getNodePath(ZNODE_DT_PREFIX, nodeName);
@Test(expected = NotImplementedException.class)
public void testRemoveStoredToken() throws IOException, YarnException {
super.testRemoveStoredToken();
}
// Check if the path exists, we expect the result to exist.
assertTrue(isExists(nodePath));
@Test(expected = NotImplementedException.class)
public void testGetTokenByRouterStoreToken() throws IOException, YarnException {
super.testGetTokenByRouterStoreToken();
// Check whether the token (paramStoreToken)
// We generated is consistent with the data stored in zk.
// We expect data to be consistent.
RouterStoreToken zkRouterStoreToken = getStoreTokenFromZK(nodePath);
assertNotNull(zkRouterStoreToken);
assertEquals(token, zkRouterStoreToken);
}
}

View File

@ -423,6 +423,31 @@ public RouterRMTokenResponse getTokenByRouterStoreToken(RouterRMTokenRequest req
return stateStoreClient.getTokenByRouterStoreToken(request);
}
@Override
public int incrementDelegationTokenSeqNum() {
return stateStoreClient.incrementDelegationTokenSeqNum();
}
@Override
public int getDelegationTokenSeqNum() {
return stateStoreClient.getDelegationTokenSeqNum();
}
@Override
public void setDelegationTokenSeqNum(int seqNum) {
stateStoreClient.setDelegationTokenSeqNum(seqNum);
}
@Override
public int getCurrentKeyId() {
return stateStoreClient.getCurrentKeyId();
}
@Override
public int incrementCurrentKeyId() {
return stateStoreClient.incrementCurrentKeyId();
}
/**
* Create a thread that cleans up the app.
* @param stage rm-start/rm-stop.

View File

@ -251,4 +251,29 @@ public synchronized Map<RMDelegationTokenIdentifier, Long> getAllTokens() {
}
return allTokens;
}
@Override
protected synchronized int incrementDelegationTokenSeqNum() {
return federationFacade.incrementDelegationTokenSeqNum();
}
@Override
protected synchronized int getDelegationTokenSeqNum() {
return federationFacade.getDelegationTokenSeqNum();
}
@Override
protected synchronized void setDelegationTokenSeqNum(int seqNum) {
federationFacade.setDelegationTokenSeqNum(seqNum);
}
@Override
protected synchronized int getCurrentKeyId() {
return federationFacade.getCurrentKeyId();
}
@Override
protected synchronized int incrementCurrentKeyId() {
return federationFacade.incrementCurrentKeyId();
}
}