YARN-7262. Add a hierarchy into the ZKRMStateStore for delegation token znodes to prevent jute buffer overflow (rkanter)

This commit is contained in:
Robert Kanter 2017-10-26 17:47:32 -07:00
parent 088ffee716
commit b1de78619f
5 changed files with 673 additions and 137 deletions

View File

@ -639,6 +639,13 @@ public class YarnConfiguration extends Configuration {
RM_ZK_PREFIX + "appid-node.split-index";
public static final int DEFAULT_ZK_APPID_NODE_SPLIT_INDEX = 0;
/** Index at which the RM Delegation Token ids will be split so that the
* delegation token znodes stored in the zookeeper RM state store will be
* stored as two different znodes (parent-child). **/
public static final String ZK_DELEGATION_TOKEN_NODE_SPLIT_INDEX =
RM_ZK_PREFIX + "delegation-token-node.split-index";
public static final int DEFAULT_ZK_DELEGATION_TOKEN_NODE_SPLIT_INDEX = 0;
public static final String RM_ZK_ACL = RM_ZK_PREFIX + "acl";
public static final String DEFAULT_RM_ZK_ACL = "world:anyone:rwcda";

View File

@ -593,6 +593,24 @@
<value>0</value>
</property>
<property>
<description>Index at which the RM Delegation Token ids will be split so
that the delegation token znodes stored in the zookeeper RM state store
will be stored as two different znodes (parent-child). The split is done
from the end. For instance, with no split, a delegation token znode will
be of the form RMDelegationToken_123456789. If the value of this config is
1, the delegation token znode will be broken into two parts:
RMDelegationToken_12345678 and 9 respectively with former being the parent
node. This config can take values from 0 to 4. 0 means there will be no
split. If the value is outside this range, it will be treated as 0 (i.e.
no split). A value larger than 0 (up to 4) should be configured if you are
running a large number of applications, with long-lived delegation tokens
and state store operations (e.g. failover) are failing due to LenError in
Zookeeper.</description>
<name>yarn.resourcemanager.zk-delegation-token-node.split-index</name>
<value>0</value>
</property>
<property>
<description>Specifies the maximum size of the data that can be stored
in a znode. Value should be same or less than jute.maxbuffer configured

View File

@ -89,6 +89,8 @@ public abstract class RMStateStore extends AbstractService {
@VisibleForTesting
public static final String RM_APP_ROOT = "RMAppRoot";
protected static final String RM_DT_SECRET_MANAGER_ROOT = "RMDTSecretManagerRoot";
protected static final String RM_DELEGATION_TOKENS_ROOT_ZNODE_NAME =
"RMDelegationTokensRoot";
protected static final String DELEGATION_KEY_PREFIX = "DelegationKey_";
protected static final String DELEGATION_TOKEN_PREFIX = "RMDelegationToken_";
protected static final String DELEGATION_TOKEN_SEQUENCE_NUMBER_PREFIX =

View File

@ -118,6 +118,22 @@ import java.util.Set;
* |--- RM_DT_SECRET_MANAGER_ROOT
* |----- RM_DT_SEQUENTIAL_NUMBER_ZNODE_NAME
* |----- RM_DELEGATION_TOKENS_ROOT_ZNODE_NAME
* | |----- 1
* | | |----- (#TokenId barring last character)
* | | | |----- (#Last character of TokenId)
* | | ....
* | |----- 2
* | | |----- (#TokenId barring last 2 characters)
* | | | |----- (#Last 2 characters of TokenId)
* | | ....
* | |----- 3
* | | |----- (#TokenId barring last 3 characters)
* | | | |----- (#Last 3 characters of TokenId)
* | | ....
* | |----- 4
* | | |----- (#TokenId barring last 4 characters)
* | | | |----- (#Last 4 characters of TokenId)
* | | ....
* | |----- Token_1
* | |----- Token_2
* | ....
@ -147,6 +163,11 @@ import java.util.Set;
* splitting it in 2 parts, depending on a configurable split index. This limits
* the number of application znodes returned in a single call while loading
* app state.
*
* Changes from 1.4 to 1.5 - Change the structure of delegation token znode by
* splitting it in 2 parts, depending on a configurable split index. This limits
* the number of delegation token znodes returned in a single call while loading
* tokens state.
*/
@Private
@Unstable
@ -162,7 +183,7 @@ public class ZKRMStateStore extends RMStateStore {
@VisibleForTesting
public static final String ROOT_ZNODE_NAME = "ZKRMStateRoot";
protected static final Version CURRENT_VERSION_INFO = Version
.newInstance(1, 4);
.newInstance(1, 5);
@VisibleForTesting
public static final String RM_APP_ROOT_HIERARCHIES = "HIERARCHIES";
@ -170,6 +191,7 @@ public class ZKRMStateStore extends RMStateStore {
private String zkRootNodePath;
private String rmAppRoot;
private Map<Integer, String> rmAppRootHierarchies;
private Map<Integer, String> rmDelegationTokenHierarchies;
private String rmDTSecretManagerRoot;
private String dtMasterKeysRootPath;
private String delegationTokensRootPath;
@ -180,6 +202,8 @@ public class ZKRMStateStore extends RMStateStore {
@VisibleForTesting
protected String znodeWorkingPath;
private int appIdNodeSplitIndex = 0;
@VisibleForTesting
protected int delegationTokenNodeSplitIndex = 0;
/* Fencing related variables */
private static final String FENCING_LOCK = "RM_ZK_FENCING_LOCK";
@ -212,12 +236,13 @@ public class ZKRMStateStore extends RMStateStore {
};
/**
* Encapsulates full app node path and corresponding split index.
* Encapsulates znode path and corresponding split index for hierarchical
* znode layouts.
*/
private final static class AppNodeSplitInfo {
private final static class ZnodeSplitInfo {
private final String path;
private final int splitIndex;
AppNodeSplitInfo(String path, int splitIndex) {
ZnodeSplitInfo(String path, int splitIndex) {
this.path = path;
this.splitIndex = splitIndex;
}
@ -288,7 +313,7 @@ public class ZKRMStateStore extends RMStateStore {
appIdNodeSplitIndex =
conf.getInt(YarnConfiguration.ZK_APPID_NODE_SPLIT_INDEX,
YarnConfiguration.DEFAULT_ZK_APPID_NODE_SPLIT_INDEX);
if (appIdNodeSplitIndex < 1 || appIdNodeSplitIndex > 4) {
if (appIdNodeSplitIndex < 0 || appIdNodeSplitIndex > 4) {
LOG.info("Invalid value " + appIdNodeSplitIndex + " for config " +
YarnConfiguration.ZK_APPID_NODE_SPLIT_INDEX + " specified. " +
"Resetting it to " +
@ -322,12 +347,30 @@ public class ZKRMStateStore extends RMStateStore {
RM_DT_MASTER_KEYS_ROOT_ZNODE_NAME);
delegationTokensRootPath = getNodePath(rmDTSecretManagerRoot,
RM_DELEGATION_TOKENS_ROOT_ZNODE_NAME);
rmDelegationTokenHierarchies = new HashMap<>(5);
rmDelegationTokenHierarchies.put(0, delegationTokensRootPath);
for (int splitIndex = 1; splitIndex <= 4; splitIndex++) {
rmDelegationTokenHierarchies.put(splitIndex,
getNodePath(delegationTokensRootPath, Integer.toString(splitIndex)));
}
dtSequenceNumberPath = getNodePath(rmDTSecretManagerRoot,
RM_DT_SEQUENTIAL_NUMBER_ZNODE_NAME);
amrmTokenSecretManagerRoot =
getNodePath(zkRootNodePath, AMRMTOKEN_SECRET_MANAGER_ROOT);
reservationRoot = getNodePath(zkRootNodePath, RESERVATION_SYSTEM_ROOT);
zkManager = resourceManager.getAndStartZKManager(conf);
delegationTokenNodeSplitIndex =
conf.getInt(YarnConfiguration.ZK_DELEGATION_TOKEN_NODE_SPLIT_INDEX,
YarnConfiguration.DEFAULT_ZK_DELEGATION_TOKEN_NODE_SPLIT_INDEX);
if (delegationTokenNodeSplitIndex < 0
|| delegationTokenNodeSplitIndex > 4) {
LOG.info("Invalid value " + delegationTokenNodeSplitIndex + " for config "
+ YarnConfiguration.ZK_DELEGATION_TOKEN_NODE_SPLIT_INDEX
+ " specified. Resetting it to " +
YarnConfiguration.DEFAULT_ZK_DELEGATION_TOKEN_NODE_SPLIT_INDEX);
delegationTokenNodeSplitIndex =
YarnConfiguration.DEFAULT_ZK_DELEGATION_TOKEN_NODE_SPLIT_INDEX;
}
}
@Override
@ -350,6 +393,9 @@ public class ZKRMStateStore extends RMStateStore {
create(rmDTSecretManagerRoot);
create(dtMasterKeysRootPath);
create(delegationTokensRootPath);
for (int splitIndex = 1; splitIndex <= 4; splitIndex++) {
create(rmDelegationTokenHierarchies.get(splitIndex));
}
create(dtSequenceNumberPath);
create(amrmTokenSecretManagerRoot);
create(reservationRoot);
@ -572,23 +618,52 @@ public class ZKRMStateStore extends RMStateStore {
}
private void loadRMDelegationTokenState(RMState rmState) throws Exception {
List<String> childNodes =
getChildren(delegationTokensRootPath);
for (String childNodeName : childNodes) {
String childNodePath =
getNodePath(delegationTokensRootPath, childNodeName);
byte[] childData = getData(childNodePath);
if (childData == null) {
LOG.warn("Content of " + childNodePath + " is broken.");
for (int splitIndex = 0; splitIndex <= 4; splitIndex++) {
String tokenRoot = rmDelegationTokenHierarchies.get(splitIndex);
if (tokenRoot == null) {
continue;
}
ByteArrayInputStream is = new ByteArrayInputStream(childData);
try (DataInputStream fsIn = new DataInputStream(is)) {
List<String> childNodes = getChildren(tokenRoot);
boolean dtNodeFound = false;
for (String childNodeName : childNodes) {
if (childNodeName.startsWith(DELEGATION_TOKEN_PREFIX)) {
dtNodeFound = true;
String parentNodePath = getNodePath(tokenRoot, childNodeName);
if (splitIndex == 0) {
loadDelegationTokenFromNode(rmState, parentNodePath);
} else {
// If znode is partitioned.
List<String> leafNodes = getChildren(parentNodePath);
for (String leafNodeName : leafNodes) {
loadDelegationTokenFromNode(rmState,
getNodePath(parentNodePath, leafNodeName));
}
}
} else if (splitIndex == 0
&& !(childNodeName.equals("1") || childNodeName.equals("2")
|| childNodeName.equals("3") || childNodeName.equals("4"))) {
LOG.debug("Unknown child node with name " + childNodeName + " under" +
tokenRoot);
}
}
if (splitIndex != delegationTokenNodeSplitIndex && !dtNodeFound) {
// If no loaded delegation token exists for a particular split index and
// the split index for which tokens are being loaded is not the one
// configured, then we do not need to keep track of this hierarchy for
// storing/updating/removing delegation token znodes.
rmDelegationTokenHierarchies.remove(splitIndex);
}
}
}
private void loadDelegationTokenFromNode(RMState rmState, String path)
throws Exception {
byte[] data = getData(path);
if (data == null) {
LOG.warn("Content of " + path + " is broken.");
} else {
ByteArrayInputStream is = new ByteArrayInputStream(data);
try (DataInputStream fsIn = new DataInputStream(is)) {
RMDelegationTokenIdentifierData identifierData =
new RMDelegationTokenIdentifierData();
identifierData.readFields(fsIn);
@ -597,7 +672,6 @@ public class ZKRMStateStore extends RMStateStore {
long renewDate = identifierData.getRenewDate();
rmState.rmSecretManagerState.delegationTokenState.put(identifier,
renewDate);
if (LOG.isDebugEnabled()) {
LOG.debug("Loaded RMDelegationTokenIdentifier: " + identifier
+ " renewDate=" + renewDate);
@ -605,7 +679,6 @@ public class ZKRMStateStore extends RMStateStore {
}
}
}
}
private void loadRMAppStateFromAppNode(RMState rmState, String appNodePath,
String appIdStr) throws Exception {
@ -649,8 +722,9 @@ public class ZKRMStateStore extends RMStateStore {
getNodePath(parentNodePath, leafNodeName), appIdStr);
}
}
} else {
LOG.info("Unknown child node with name: " + childNodeName);
} else if (!childNodeName.equals(RM_APP_ROOT_HIERARCHIES)){
LOG.debug("Unknown child node with name " + childNodeName + " under" +
appRoot);
}
}
if (splitIndex != appIdNodeSplitIndex && !appNodeFound) {
@ -683,36 +757,36 @@ public class ZKRMStateStore extends RMStateStore {
}
/**
* Get parent app node path based on full path and split index supplied.
* @param appIdPath App id path for which parent needs to be returned.
* Get znode path based on full path and split index supplied.
* @param path path for which parent needs to be returned.
* @param splitIndex split index.
* @return parent app node path.
*/
private String getSplitAppNodeParent(String appIdPath, int splitIndex) {
// Calculated as string upto index (appIdPath Length - split index - 1). We
private String getSplitZnodeParent(String path, int splitIndex) {
// Calculated as string up to index (path Length - split index - 1). We
// deduct 1 to exclude path separator.
return appIdPath.substring(0, appIdPath.length() - splitIndex - 1);
return path.substring(0, path.length() - splitIndex - 1);
}
/**
* Checks if parent app node has no leaf nodes and if it does not have,
* removes it. Called while removing application.
* @param appIdPath path of app id to be removed.
* Checks if parent znode has no leaf nodes and if it does not have,
* removes it.
* @param path path of znode to be removed.
* @param splitIndex split index.
* @throws Exception if any problem occurs while performing ZK operation.
*/
private void checkRemoveParentAppNode(String appIdPath, int splitIndex)
private void checkRemoveParentZnode(String path, int splitIndex)
throws Exception {
if (splitIndex != 0) {
String parentAppNode = getSplitAppNodeParent(appIdPath, splitIndex);
String parentZnode = getSplitZnodeParent(path, splitIndex);
List<String> children = null;
try {
children = getChildren(parentAppNode);
children = getChildren(parentZnode);
} catch (KeeperException.NoNodeException ke) {
// It should be fine to swallow this exception as the parent app node we
// It should be fine to swallow this exception as the parent znode we
// intend to delete is already deleted.
if (LOG.isDebugEnabled()) {
LOG.debug("Unable to remove app parent node " + parentAppNode +
LOG.debug("Unable to remove parent node " + parentZnode +
" as it does not exist.");
}
return;
@ -720,16 +794,16 @@ public class ZKRMStateStore extends RMStateStore {
// No apps stored under parent path.
if (children != null && children.isEmpty()) {
try {
zkManager.safeDelete(parentAppNode, zkAcl, fencingNodePath);
zkManager.safeDelete(parentZnode, zkAcl, fencingNodePath);
if (LOG.isDebugEnabled()) {
LOG.debug("No leaf app node exists. Removing parent node " +
parentAppNode);
LOG.debug("No leaf znode exists. Removing parent node " +
parentZnode);
}
} catch (KeeperException.NotEmptyException ke) {
// It should be fine to swallow this exception as the parent app node
// It should be fine to swallow this exception as the parent znode
// has to be deleted only if it has no children. And this node has.
if (LOG.isDebugEnabled()) {
LOG.debug("Unable to remove app parent node " + parentAppNode +
LOG.debug("Unable to remove app parent node " + parentZnode +
" as it has children.");
}
}
@ -770,7 +844,7 @@ public class ZKRMStateStore extends RMStateStore {
// Look for paths based on other split indices if path as per split index
// does not exist.
if (!exists(nodeUpdatePath)) {
AppNodeSplitInfo alternatePathInfo = getAlternatePath(appId.toString());
ZnodeSplitInfo alternatePathInfo = getAlternateAppPath(appId.toString());
if (alternatePathInfo != null) {
nodeUpdatePath = alternatePathInfo.path;
} else {
@ -778,7 +852,7 @@ public class ZKRMStateStore extends RMStateStore {
pathExists = false;
if (appIdNodeSplitIndex != 0) {
String rootNode =
getSplitAppNodeParent(nodeUpdatePath, appIdNodeSplitIndex);
getSplitZnodeParent(nodeUpdatePath, appIdNodeSplitIndex);
if (!exists(rootNode)) {
zkManager.safeCreate(rootNode, null, zkAcl, CreateMode.PERSISTENT,
zkAcl, fencingNodePath);
@ -819,7 +893,7 @@ public class ZKRMStateStore extends RMStateStore {
String appDirPath = getLeafAppIdNodePath(appId, false);
// Look for paths based on other split indices.
if (!exists(appDirPath)) {
AppNodeSplitInfo alternatePathInfo = getAlternatePath(appId);
ZnodeSplitInfo alternatePathInfo = getAlternateAppPath(appId);
if (alternatePathInfo == null) {
if (operation == AppAttemptOp.REMOVE) {
// Unexpected. Assume that app attempt has been deleted.
@ -918,7 +992,7 @@ public class ZKRMStateStore extends RMStateStore {
// Look for paths based on other split indices if path as per configured
// split index does not exist.
if (!exists(appIdRemovePath)) {
AppNodeSplitInfo alternatePathInfo = getAlternatePath(removeAppId);
ZnodeSplitInfo alternatePathInfo = getAlternateAppPath(removeAppId);
if (alternatePathInfo != null) {
appIdRemovePath = alternatePathInfo.path;
splitIndex = alternatePathInfo.splitIndex;
@ -946,75 +1020,26 @@ public class ZKRMStateStore extends RMStateStore {
forPath(appIdRemovePath);
}
// Check if we should remove the parent app node as well.
checkRemoveParentAppNode(appIdRemovePath, splitIndex);
checkRemoveParentZnode(appIdRemovePath, splitIndex);
}
@Override
protected synchronized void storeRMDelegationTokenState(
RMDelegationTokenIdentifier rmDTIdentifier, Long renewDate)
throws Exception {
SafeTransaction trx = zkManager.createTransaction(zkAcl, fencingNodePath);
addStoreOrUpdateOps(trx, rmDTIdentifier, renewDate, false);
trx.commit();
}
@Override
protected synchronized void removeRMDelegationTokenState(
RMDelegationTokenIdentifier rmDTIdentifier) throws Exception {
String nodeRemovePath =
getNodePath(delegationTokensRootPath, DELEGATION_TOKEN_PREFIX
+ rmDTIdentifier.getSequenceNumber());
String nodeCreatePath = getLeafDelegationTokenNodePath(
rmDTIdentifier.getSequenceNumber(), true);
if (LOG.isDebugEnabled()) {
LOG.debug("Removing RMDelegationToken_"
LOG.debug("Storing " + DELEGATION_TOKEN_PREFIX
+ rmDTIdentifier.getSequenceNumber());
}
zkManager.safeDelete(nodeRemovePath, zkAcl, fencingNodePath);
}
@Override
protected synchronized void updateRMDelegationTokenState(
RMDelegationTokenIdentifier rmDTIdentifier, Long renewDate)
throws Exception {
SafeTransaction trx = zkManager.createTransaction(zkAcl, fencingNodePath);
String nodeRemovePath =
getNodePath(delegationTokensRootPath, DELEGATION_TOKEN_PREFIX
+ rmDTIdentifier.getSequenceNumber());
if (exists(nodeRemovePath)) {
// in case znode exists
addStoreOrUpdateOps(trx, rmDTIdentifier, renewDate, true);
} else {
// in case znode doesn't exist
addStoreOrUpdateOps(trx, rmDTIdentifier, renewDate, false);
if (LOG.isDebugEnabled()) {
LOG.debug("Attempted to update a non-existing znode " + nodeRemovePath);
}
}
trx.commit();
}
private void addStoreOrUpdateOps(SafeTransaction trx,
RMDelegationTokenIdentifier rmDTIdentifier, Long renewDate,
boolean isUpdate) throws Exception {
// store RM delegation token
String nodeCreatePath = getNodePath(delegationTokensRootPath,
DELEGATION_TOKEN_PREFIX + rmDTIdentifier.getSequenceNumber());
RMDelegationTokenIdentifierData identifierData =
new RMDelegationTokenIdentifierData(rmDTIdentifier, renewDate);
ByteArrayOutputStream seqOs = new ByteArrayOutputStream();
try (DataOutputStream seqOut = new DataOutputStream(seqOs)) {
if (isUpdate) {
if (LOG.isDebugEnabled()) {
LOG.debug("Updating RMDelegationToken_"
+ rmDTIdentifier.getSequenceNumber());
}
trx.setData(nodeCreatePath, identifierData.toByteArray(), -1);
} else {
SafeTransaction trx = zkManager.createTransaction(zkAcl,
fencingNodePath);
trx.create(nodeCreatePath, identifierData.toByteArray(), zkAcl,
CreateMode.PERSISTENT);
// Update Sequence number only while storing DT
@ -1026,8 +1051,72 @@ public class ZKRMStateStore extends RMStateStore {
}
trx.setData(dtSequenceNumberPath, seqOs.toByteArray(), -1);
trx.commit();
}
}
@Override
protected synchronized void removeRMDelegationTokenState(
RMDelegationTokenIdentifier rmDTIdentifier) throws Exception {
String nodeRemovePath = getLeafDelegationTokenNodePath(
rmDTIdentifier.getSequenceNumber(), false);
int splitIndex = delegationTokenNodeSplitIndex;
// Look for paths based on other split indices if path as per configured
// split index does not exist.
if (!exists(nodeRemovePath)) {
ZnodeSplitInfo alternatePathInfo =
getAlternateDTPath(rmDTIdentifier.getSequenceNumber());
if (alternatePathInfo != null) {
nodeRemovePath = alternatePathInfo.path;
splitIndex = alternatePathInfo.splitIndex;
} else {
// Alternate path not found so return.
return;
}
}
if (LOG.isDebugEnabled()) {
LOG.debug("Removing RMDelegationToken_"
+ rmDTIdentifier.getSequenceNumber());
}
zkManager.safeDelete(nodeRemovePath, zkAcl, fencingNodePath);
// Check if we should remove the parent app node as well.
checkRemoveParentZnode(nodeRemovePath, splitIndex);
}
@Override
protected synchronized void updateRMDelegationTokenState(
RMDelegationTokenIdentifier rmDTIdentifier, Long renewDate)
throws Exception {
String nodeUpdatePath = getLeafDelegationTokenNodePath(
rmDTIdentifier.getSequenceNumber(), false);
boolean pathExists = true;
// Look for paths based on other split indices if path as per split index
// does not exist.
if (!exists(nodeUpdatePath)) {
ZnodeSplitInfo alternatePathInfo =
getAlternateDTPath(rmDTIdentifier.getSequenceNumber());
if (alternatePathInfo != null) {
nodeUpdatePath = alternatePathInfo.path;
} else {
pathExists = false;
}
}
if (pathExists) {
if (LOG.isDebugEnabled()) {
LOG.debug("Updating " + DELEGATION_TOKEN_PREFIX
+ rmDTIdentifier.getSequenceNumber());
}
RMDelegationTokenIdentifierData identifierData =
new RMDelegationTokenIdentifierData(rmDTIdentifier, renewDate);
zkManager.safeSetData(nodeUpdatePath, identifierData.toByteArray(), -1,
zkAcl, fencingNodePath);
} else {
storeRMDelegationTokenState(rmDTIdentifier, renewDate);
}
}
@Override
@ -1156,19 +1245,19 @@ public class ZKRMStateStore extends RMStateStore {
* Get alternate path for app id if path according to configured split index
* does not exist. We look for path based on all possible split indices.
* @param appId
* @return a {@link AppNodeSplitInfo} object containing the path and split
* @return a {@link ZnodeSplitInfo} object containing the path and split
* index if it exists, null otherwise.
* @throws Exception if any problem occurs while performing ZK operation.
*/
private AppNodeSplitInfo getAlternatePath(String appId) throws Exception {
private ZnodeSplitInfo getAlternateAppPath(String appId) throws Exception {
for (Map.Entry<Integer, String> entry : rmAppRootHierarchies.entrySet()) {
// Look for other paths
int splitIndex = entry.getKey();
if (splitIndex != appIdNodeSplitIndex) {
String alternatePath =
getLeafAppIdNodePath(appId, entry.getValue(), splitIndex, false);
getLeafZnodePath(appId, entry.getValue(), splitIndex, false);
if (exists(alternatePath)) {
return new AppNodeSplitInfo(alternatePath, splitIndex);
return new ZnodeSplitInfo(alternatePath, splitIndex);
}
}
}
@ -1176,26 +1265,25 @@ public class ZKRMStateStore extends RMStateStore {
}
/**
* Returns leaf app node path based on app id and passed split index. If the
* passed flag createParentIfNotExists is true, also creates the parent app
* node if it does not exist.
* @param appId application id.
* Returns leaf znode path based on node name and passed split index. If the
* passed flag createParentIfNotExists is true, also creates the parent znode
* if it does not exist.
* @param nodeName the node name.
* @param rootNode app root node based on split index.
* @param appIdNodeSplitIdx split index.
* @param createParentIfNotExists flag which determines if parent app node
* @param splitIdx split index.
* @param createParentIfNotExists flag which determines if parent znode
* needs to be created(as per split) if it does not exist.
* @return leaf app node path.
* @return leaf znode path.
* @throws Exception if any problem occurs while performing ZK operation.
*/
private String getLeafAppIdNodePath(String appId, String rootNode,
int appIdNodeSplitIdx, boolean createParentIfNotExists) throws Exception {
if (appIdNodeSplitIdx == 0) {
return getNodePath(rootNode, appId);
private String getLeafZnodePath(String nodeName, String rootNode,
int splitIdx, boolean createParentIfNotExists) throws Exception {
if (splitIdx == 0) {
return getNodePath(rootNode, nodeName);
}
String nodeName = appId;
int splitIdx = nodeName.length() - appIdNodeSplitIdx;
int split = nodeName.length() - splitIdx;
String rootNodePath =
getNodePath(rootNode, nodeName.substring(0, splitIdx));
getNodePath(rootNode, nodeName.substring(0, split));
if (createParentIfNotExists && !exists(rootNodePath)) {
try {
zkManager.safeCreate(rootNodePath, null, zkAcl, CreateMode.PERSISTENT,
@ -1207,7 +1295,7 @@ public class ZKRMStateStore extends RMStateStore {
}
}
}
return getNodePath(rootNodePath, nodeName.substring(splitIdx));
return getNodePath(rootNodePath, nodeName.substring(split));
}
/**
@ -1222,10 +1310,77 @@ public class ZKRMStateStore extends RMStateStore {
*/
private String getLeafAppIdNodePath(String appId,
boolean createParentIfNotExists) throws Exception {
return getLeafAppIdNodePath(appId, rmAppRootHierarchies.get(
return getLeafZnodePath(appId, rmAppRootHierarchies.get(
appIdNodeSplitIndex), appIdNodeSplitIndex, createParentIfNotExists);
}
/**
* Returns leaf delegation token node path based on sequence number and
* configured split index. If the passed flag createParentIfNotExists is true,
* also creates the parent znode if it does not exist. The sequence number
* is padded to be at least 4 digits wide to ensure consistency with the split
* indexing.
* @param rmDTSequenceNumber delegation token sequence number.
* @param createParentIfNotExists flag which determines if parent znode
* needs to be created(as per split) if it does not exist.
* @return leaf delegation token node path.
* @throws Exception if any problem occurs while performing ZK operation.
*/
private String getLeafDelegationTokenNodePath(int rmDTSequenceNumber,
boolean createParentIfNotExists) throws Exception {
return getLeafDelegationTokenNodePath(rmDTSequenceNumber,
createParentIfNotExists, delegationTokenNodeSplitIndex);
}
/**
* Returns leaf delegation token node path based on sequence number and
* passed split index. If the passed flag createParentIfNotExists is true,
* also creates the parent znode if it does not exist. The sequence number
* is padded to be at least 4 digits wide to ensure consistency with the split
* indexing.
* @param rmDTSequenceNumber delegation token sequence number.
* @param createParentIfNotExists flag which determines if parent znode
* needs to be created(as per split) if it does not exist.
* @param split the split index to use
* @return leaf delegation token node path.
* @throws Exception if any problem occurs while performing ZK operation.
*/
private String getLeafDelegationTokenNodePath(int rmDTSequenceNumber,
boolean createParentIfNotExists, int split) throws Exception {
String nodeName = DELEGATION_TOKEN_PREFIX;
if (split == 0) {
nodeName += rmDTSequenceNumber;
} else {
nodeName += String.format("%04d", rmDTSequenceNumber);
}
return getLeafZnodePath(nodeName, rmDelegationTokenHierarchies.get(split),
split, createParentIfNotExists);
}
/**
* Get alternate path for delegation token if path according to configured
* split index does not exist. We look for path based on all possible split
* indices.
* @param rmDTSequenceNumber delegation token sequence number.
* @return a {@link ZnodeSplitInfo} object containing the path and split
* index if it exists, null otherwise.
* @throws Exception if any problem occurs while performing ZK operation.
*/
private ZnodeSplitInfo getAlternateDTPath(int rmDTSequenceNumber)
throws Exception {
// Check all possible paths until we find it
for (int splitIndex : rmDelegationTokenHierarchies.keySet()) {
if (splitIndex != delegationTokenNodeSplitIndex) {
String alternatePath = getLeafDelegationTokenNodePath(
rmDTSequenceNumber, false, splitIndex);
if (exists(alternatePath)) {
return new ZnodeSplitInfo(alternatePath, splitIndex);
}
}
}
return null;
}
@VisibleForTesting
byte[] getData(final String path) throws Exception {
return zkManager.getData(path);

View File

@ -64,7 +64,6 @@ import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.ZooDefs.Perms;
import org.apache.zookeeper.data.ACL;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
@ -75,16 +74,20 @@ import com.google.common.collect.Lists;
import static org.junit.Assert.assertArrayEquals;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.when;
import java.io.IOException;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import javax.crypto.SecretKey;
@ -128,10 +131,9 @@ public class TestZKRMStateStore extends RMStateStoreTestBase {
TestZKRMStateStoreInternal store;
String workingZnode;
class TestZKRMStateStoreInternal extends ZKRMStateStore {
public TestZKRMStateStoreInternal(Configuration conf, String workingZnode)
TestZKRMStateStoreInternal(Configuration conf, String workingZnode)
throws Exception {
setResourceManager(new ResourceManager());
init(conf);
@ -140,7 +142,7 @@ public class TestZKRMStateStore extends RMStateStoreTestBase {
assertTrue(znodeWorkingPath.equals(workingZnode));
}
public String getVersionNode() {
private String getVersionNode() {
return znodeWorkingPath + "/" + ROOT_ZNODE_NAME + "/" + VERSION_NODE;
}
@ -162,11 +164,11 @@ public class TestZKRMStateStore extends RMStateStoreTestBase {
return rootPath + "/" + appPath;
}
public String getAppNode(String appId) {
private String getAppNode(String appId) {
return getAppNode(appId, 0);
}
public String getAttemptNode(String appId, String attemptId) {
private String getAttemptNode(String appId, String attemptId) {
return getAppNode(appId) + "/" + attemptId;
}
@ -174,10 +176,28 @@ public class TestZKRMStateStore extends RMStateStoreTestBase {
* Emulating retrying createRootDir not to raise NodeExist exception
* @throws Exception
*/
public void testRetryingCreateRootDir() throws Exception {
private void testRetryingCreateRootDir() throws Exception {
create(znodeWorkingPath);
}
private String getDelegationTokenNode(int rmDTSequenceNumber, int splitIdx) {
String rootPath = workingZnode + "/" + ROOT_ZNODE_NAME + "/" +
RM_DT_SECRET_MANAGER_ROOT + "/" +
RMStateStore.RM_DELEGATION_TOKENS_ROOT_ZNODE_NAME;
String nodeName = DELEGATION_TOKEN_PREFIX;
if (splitIdx == 0) {
nodeName += rmDTSequenceNumber;
} else {
nodeName += String.format("%04d", rmDTSequenceNumber);
}
String path = nodeName;
if (splitIdx != 0) {
int idx = nodeName.length() - splitIdx;
path = splitIdx + "/" + nodeName.substring(0, idx) + "/"
+ nodeName.substring(idx);
}
return rootPath + "/" + path;
}
}
private RMStateStore createStore(Configuration conf) throws Exception {
@ -235,6 +255,17 @@ public class TestZKRMStateStore extends RMStateStoreTestBase {
.forPath(store.getAttemptNode(
attemptId.getApplicationId().toString(), attemptId.toString()));
}
public boolean delegationTokenExists(RMDelegationTokenIdentifier token,
int index) throws Exception {
int rmDTSequenceNumber = token.getSequenceNumber();
return curatorFramework.checkExists().forPath(
store.getDelegationTokenNode(rmDTSequenceNumber, index)) != null;
}
public int getDelegationTokenNodeSplitIndex() {
return store.delegationTokenNodeSplitIndex;
}
}
@Test (timeout = 60000)
@ -332,7 +363,8 @@ public class TestZKRMStateStore extends RMStateStoreTestBase {
RMStateStore store = zkTester.getRMStateStore();
Version defaultVersion = zkTester.getCurrentVersion();
store.checkVersion();
Assert.assertEquals(defaultVersion, store.loadVersion());
assertEquals("Store had wrong version",
defaultVersion, store.loadVersion());
}
public static Configuration createHARMConf(String rmIds, String rmId,
@ -546,11 +578,20 @@ public class TestZKRMStateStore extends RMStateStoreTestBase {
new Text("renewer1"), new Text("realuser1"));
Long renewDate1 = new Long(System.currentTimeMillis());
dtId1.setSequenceNumber(1111);
assertFalse("Token " + dtId1
+ " should not exist but was found in ZooKeeper",
zkTester.delegationTokenExists(dtId1, 0));
store.storeRMDelegationToken(dtId1, renewDate1);
assertFalse("Token " + dtId1
+ " should not exist but was found in ZooKeeper",
zkTester.delegationTokenExists(dtId1, 0));
assertEquals("RMStateStore should have been in fenced state", true,
store.isFencedState());
store.updateRMDelegationToken(dtId1, renewDate1);
assertFalse("Token " + dtId1
+ " should not exist but was found in ZooKeeper",
zkTester.delegationTokenExists(dtId1, 0));
assertEquals("RMStateStore should have been in fenced state", true,
store.isFencedState());
@ -606,7 +647,7 @@ public class TestZKRMStateStore extends RMStateStoreTestBase {
try {
store.removeApplicationStateInternal(appStateRemoved);
} catch (KeeperException.NoNodeException nne) {
Assert.fail("NoNodeException should not happen.");
fail("NoNodeException should not happen.");
}
store.close();
}
@ -1134,4 +1175,317 @@ public class TestZKRMStateStore extends RMStateStoreTestBase {
// Close the state store.
store.close();
}
private static Configuration createConfForDelegationTokenNodeSplit(
int splitIndex) {
YarnConfiguration conf = new YarnConfiguration();
conf.setInt(YarnConfiguration.ZK_DELEGATION_TOKEN_NODE_SPLIT_INDEX,
splitIndex);
return conf;
}
private void verifyDelegationTokensStateStore(
TestZKRMStateStoreTester zkTester,
Map<RMDelegationTokenIdentifier, Long> tokensWithRenewal,
Map<RMDelegationTokenIdentifier, Integer> tokensWithIndex,
int sequenceNumber) throws Exception {
RMStateStore.RMDTSecretManagerState secretManagerState =
zkTester.store.loadState().getRMDTSecretManagerState();
assertEquals("Unexpected token state",
tokensWithRenewal, secretManagerState.getTokenState());
assertEquals("Unexpected sequence number", sequenceNumber,
secretManagerState.getDTSequenceNumber());
for (Map.Entry<RMDelegationTokenIdentifier, Integer> tokenEntry
: tokensWithIndex.entrySet()) {
assertTrue("Expected to find token " + tokenEntry.getKey()
+ " in zookeeper but did not",
zkTester.delegationTokenExists(tokenEntry.getKey(),
tokenEntry.getValue()));
}
}
private void verifyDelegationTokenInStateStore(
TestZKRMStateStoreTester zkTester, RMDelegationTokenIdentifier token,
long renewDate, int index) throws Exception {
RMStateStore.RMDTSecretManagerState secretManagerState =
zkTester.store.loadState().getRMDTSecretManagerState();
Map<RMDelegationTokenIdentifier, Long> tokenState =
secretManagerState.getTokenState();
assertTrue("token state does not contain " + token,
tokenState.containsKey(token));
assertTrue("token state does not contain a token with renewal " + renewDate,
tokenState.containsValue(renewDate));
assertTrue("Token " + token + "should exist but was not found in ZooKeeper",
zkTester.delegationTokenExists(token, index));
}
private RMDelegationTokenIdentifier storeUpdateAndVerifyDelegationToken(
TestZKRMStateStoreTester zkTester,
Map<RMDelegationTokenIdentifier, Long> tokensWithRenewal,
Map<RMDelegationTokenIdentifier, Integer> tokensWithIndex,
int sequenceNumber, int split) throws Exception {
// Store token
RMDelegationTokenIdentifier token =
new RMDelegationTokenIdentifier(new Text("owner"),
new Text("renewer"), new Text("realuser"));
assertFalse("Token should not exist but was found in ZooKeeper",
zkTester.delegationTokenExists(token, split));
token.setSequenceNumber(sequenceNumber);
Long renewDate = System.currentTimeMillis();
zkTester.store.storeRMDelegationToken(token, renewDate);
modifyRMDelegationTokenState();
tokensWithRenewal.put(token, renewDate);
tokensWithIndex.put(token, split);
// Verify the token
verifyDelegationTokensStateStore(zkTester, tokensWithRenewal,
tokensWithIndex, sequenceNumber);
// Update the token
renewDate = System.currentTimeMillis();
zkTester.store.updateRMDelegationToken(token, renewDate);
tokensWithRenewal.put(token, renewDate);
tokensWithIndex.put(token, split);
// Verify updates
verifyDelegationTokensStateStore(zkTester, tokensWithRenewal,
tokensWithIndex, sequenceNumber);
return token;
}
@Test
public void testDelegationTokenSplitIndexConfig() throws Exception {
// Valid values
TestZKRMStateStoreTester zkTester = new TestZKRMStateStoreTester();
zkTester.getRMStateStore(createConfForDelegationTokenNodeSplit(0)).close();
assertEquals("Incorrect split index",
0, zkTester.getDelegationTokenNodeSplitIndex());
zkTester.getRMStateStore(createConfForDelegationTokenNodeSplit(1)).close();
assertEquals("Incorrect split index",
1, zkTester.getDelegationTokenNodeSplitIndex());
zkTester.getRMStateStore(createConfForDelegationTokenNodeSplit(2)).close();
assertEquals("Incorrect split index",
2, zkTester.getDelegationTokenNodeSplitIndex());
zkTester.getRMStateStore(createConfForDelegationTokenNodeSplit(3)).close();
assertEquals("Incorrect split index",
3, zkTester.getDelegationTokenNodeSplitIndex());
zkTester.getRMStateStore(createConfForDelegationTokenNodeSplit(4)).close();
assertEquals("Incorrect split index",
4, zkTester.getDelegationTokenNodeSplitIndex());
// Invalid values --> override to 0
zkTester.getRMStateStore(createConfForDelegationTokenNodeSplit(-1)).close();
assertEquals("Incorrect split index",
0, zkTester.getDelegationTokenNodeSplitIndex());
zkTester.getRMStateStore(createConfForDelegationTokenNodeSplit(5)).close();
assertEquals("Incorrect split index",
0, zkTester.getDelegationTokenNodeSplitIndex());
}
@Test
public void testDelegationTokenNodeNoSplit() throws Exception {
testDelegationTokenNode(0);
}
@Test
public void testDelegationTokenNodeWithSplitOne() throws Exception {
testDelegationTokenNode(1);
}
@Test
public void testDelegationTokenNodeWithSplitTwo() throws Exception {
testDelegationTokenNode(2);
}
@Test
public void testDelegationTokenNodeWithSplitThree() throws Exception {
testDelegationTokenNode(3);
}
@Test
public void testDelegationTokenNodeWithSplitFour() throws Exception {
testDelegationTokenNode(4);
}
public void testDelegationTokenNode(int split) throws Exception {
TestZKRMStateStoreTester zkTester = new TestZKRMStateStoreTester();
Configuration conf = createConfForDelegationTokenNodeSplit(split);
RMStateStore store = zkTester.getRMStateStore(conf);
// Store the token and verify
Map<RMDelegationTokenIdentifier, Long> tokensWithRenewal = new HashMap<>();
Map<RMDelegationTokenIdentifier, Integer> tokensWithIndex = new HashMap<>();
int sequenceNumber = 0;
RMDelegationTokenIdentifier token = storeUpdateAndVerifyDelegationToken(
zkTester, tokensWithRenewal, tokensWithIndex, sequenceNumber, split);
// Delete the token and verify
store.removeRMDelegationToken(token);
RMStateStore.RMDTSecretManagerState state =
store.loadState().getRMDTSecretManagerState();
tokensWithRenewal.clear();
tokensWithIndex.clear();
assertEquals("Unexpected token state",
tokensWithRenewal, state.getTokenState());
assertEquals("Unexpected sequence number",
sequenceNumber, state.getDTSequenceNumber());
assertFalse("Token should not exist but was found in ZooKeeper",
zkTester.delegationTokenExists(token, split));
store.close();
}
@Test
public void testDelegationTokenNodeWithSplitMultiple() throws Exception {
TestZKRMStateStoreTester zkTester = new TestZKRMStateStoreTester();
Configuration conf = createConfForDelegationTokenNodeSplit(1);
RMStateStore store = zkTester.getRMStateStore(conf);
// With the split set to 1, we can store 10 tokens under a znode (i.e. 0-9)
// Try to store more than 10
Map<RMDelegationTokenIdentifier, Long> tokensWithRenewal = new HashMap<>();
Map<RMDelegationTokenIdentifier, Integer> tokensWithIndex = new HashMap<>();
Set<RMDelegationTokenIdentifier> tokensToDelete = new HashSet<>();
int sequenceNumber = 0;
for (int i = 0; i <= 12; i++) {
RMDelegationTokenIdentifier token =
new RMDelegationTokenIdentifier(new Text("owner" + i),
new Text("renewer" + i), new Text("realuser" + i));
sequenceNumber = i;
token.setSequenceNumber(sequenceNumber);
assertFalse("Token should not exist but was found in ZooKeeper",
zkTester.delegationTokenExists(token, 1));
Long renewDate = System.currentTimeMillis();
store.storeRMDelegationToken(token, renewDate);
modifyRMDelegationTokenState();
tokensWithRenewal.put(token, renewDate);
tokensWithIndex.put(token, 1);
switch (i) {
case 0:
case 3:
case 6:
case 11:
tokensToDelete.add(token);
break;
default:
break;
}
}
// Verify
verifyDelegationTokensStateStore(zkTester, tokensWithRenewal,
tokensWithIndex, sequenceNumber);
// Try deleting some tokens and adding some new ones
for (RMDelegationTokenIdentifier tokenToDelete : tokensToDelete) {
store.removeRMDelegationToken(tokenToDelete);
tokensWithRenewal.remove(tokenToDelete);
tokensWithIndex.remove(tokenToDelete);
}
for (int i = 13; i <= 22; i++) {
RMDelegationTokenIdentifier token =
new RMDelegationTokenIdentifier(new Text("owner" + i),
new Text("renewer" + i), new Text("realuser" + i));
sequenceNumber = i;
token.setSequenceNumber(sequenceNumber);
Long renewDate = System.currentTimeMillis();
store.storeRMDelegationToken(token, renewDate);
modifyRMDelegationTokenState();
tokensWithRenewal.put(token, renewDate);
tokensWithIndex.put(token, 1);
}
// Verify
verifyDelegationTokensStateStore(zkTester, tokensWithRenewal,
tokensWithIndex, sequenceNumber);
for (RMDelegationTokenIdentifier token : tokensToDelete) {
assertFalse("Token " + token
+ " should not exist but was found in ZooKeeper",
zkTester.delegationTokenExists(token, 1));
}
store.close();
}
@Test
public void testDelegationTokenNodeWithSplitChangeAcrossRestarts()
throws Exception {
TestZKRMStateStoreTester zkTester = new TestZKRMStateStoreTester();
Map<RMDelegationTokenIdentifier, Long> tokensWithRenewal = new HashMap<>();
Map<RMDelegationTokenIdentifier, Integer> tokensWithIndex = new HashMap<>();
int sequenceNumber = 0;
// Start the store with index 1
Configuration conf = createConfForDelegationTokenNodeSplit(1);
RMStateStore store = zkTester.getRMStateStore(conf);
// Store a token with index 1
RMDelegationTokenIdentifier token1 = storeUpdateAndVerifyDelegationToken(
zkTester, tokensWithRenewal, tokensWithIndex, sequenceNumber, 1);
store.close();
// Start the store with index 2
conf = createConfForDelegationTokenNodeSplit(2);
store = zkTester.getRMStateStore(conf);
// Verify token1 is still there and under the /1/ znode
verifyDelegationTokenInStateStore(
zkTester, token1, tokensWithRenewal.get(token1), 1);
// Store a token with index 2
sequenceNumber++;
RMDelegationTokenIdentifier token2 = storeUpdateAndVerifyDelegationToken(
zkTester, tokensWithRenewal, tokensWithIndex, sequenceNumber, 2);
// Update and verify token1
long renewDate1 = System.currentTimeMillis();
zkTester.store.updateRMDelegationToken(token1, renewDate1);
tokensWithRenewal.put(token1, renewDate1);
verifyDelegationTokenInStateStore(
zkTester, token1, tokensWithRenewal.get(token1), 1);
store.close();
// Start the store with index 0
conf = createConfForDelegationTokenNodeSplit(0);
store = zkTester.getRMStateStore(conf);
// Verify token1 is still there and under the /1/ znode
verifyDelegationTokenInStateStore(
zkTester, token1, tokensWithRenewal.get(token1), 1);
// Verify token2 is still there and under the /2/ znode
verifyDelegationTokenInStateStore(
zkTester, token2, tokensWithRenewal.get(token2), 2);
// Store a token with no index
sequenceNumber++;
RMDelegationTokenIdentifier token0 = storeUpdateAndVerifyDelegationToken(
zkTester, tokensWithRenewal, tokensWithIndex, sequenceNumber, 0);
store.close();
// Start the store with index 3
conf = createConfForDelegationTokenNodeSplit(3);
store = zkTester.getRMStateStore(conf);
// Verify token1 is still there and under the /1/ znode
verifyDelegationTokenInStateStore(
zkTester, token1, tokensWithRenewal.get(token1), 1);
// Verify token2 is still there and under the /2/ znode
verifyDelegationTokenInStateStore(
zkTester, token2, tokensWithRenewal.get(token2), 2);
// Verify token0 is still there and under the token root node
verifyDelegationTokenInStateStore(
zkTester, token0, tokensWithRenewal.get(token0), 0);
// Delete all tokens and verify
for (RMDelegationTokenIdentifier token : tokensWithRenewal.keySet()) {
store.removeRMDelegationToken(token);
}
tokensWithRenewal.clear();
tokensWithIndex.clear();
verifyDelegationTokensStateStore(
zkTester, tokensWithRenewal, tokensWithIndex, sequenceNumber);
assertFalse("Token " + token1
+ " should not exist but was found in ZooKeeper",
zkTester.delegationTokenExists(token1, 1));
assertFalse("Token " + token1
+ " should not exist but was found in ZooKeeper",
zkTester.delegationTokenExists(token2, 2));
assertFalse("Token " + token1
+ " should not exist but was found in ZooKeeper",
zkTester.delegationTokenExists(token0, 0));
// Store a token with index 3
sequenceNumber++;
storeUpdateAndVerifyDelegationToken(zkTester, tokensWithRenewal,
tokensWithIndex, sequenceNumber, 3);
store.close();
}
}