From 7c26ae506af97254adeaad9642cbfb519c6865be Mon Sep 17 00:00:00 2001 From: Robert Kanter Date: Thu, 26 Oct 2017 17:47:32 -0700 Subject: [PATCH] YARN-7262. Add a hierarchy into the ZKRMStateStore for delegation token znodes to prevent jute buffer overflow (rkanter) (cherry picked from commit b1de78619f3e5e25d6f9d5eaf41925f22d212fb9) --- .../hadoop/yarn/conf/YarnConfiguration.java | 7 + .../src/main/resources/yarn-default.xml | 18 + .../recovery/RMStateStore.java | 2 + .../recovery/ZKRMStateStore.java | 411 ++++++++++++------ .../recovery/TestZKRMStateStore.java | 372 +++++++++++++++- 5 files changed, 673 insertions(+), 137 deletions(-) diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java index 69dd080a948..96f6c57a855 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java @@ -610,6 +610,13 @@ public class YarnConfiguration extends Configuration { RM_ZK_PREFIX + "appid-node.split-index"; public static final int DEFAULT_ZK_APPID_NODE_SPLIT_INDEX = 0; + /** Index at which the RM Delegation Token ids will be split so that the + * delegation token znodes stored in the zookeeper RM state store will be + * stored as two different znodes (parent-child). **/ + public static final String ZK_DELEGATION_TOKEN_NODE_SPLIT_INDEX = + RM_ZK_PREFIX + "delegation-token-node.split-index"; + public static final int DEFAULT_ZK_DELEGATION_TOKEN_NODE_SPLIT_INDEX = 0; + public static final String RM_ZK_ACL = RM_ZK_PREFIX + "acl"; public static final String DEFAULT_RM_ZK_ACL = "world:anyone:rwcda"; diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml index a5f56986029..937b7b0b4fb 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml @@ -603,6 +603,24 @@ 0 + + Index at which the RM Delegation Token ids will be split so + that the delegation token znodes stored in the zookeeper RM state store + will be stored as two different znodes (parent-child). The split is done + from the end. For instance, with no split, a delegation token znode will + be of the form RMDelegationToken_123456789. If the value of this config is + 1, the delegation token znode will be broken into two parts: + RMDelegationToken_12345678 and 9 respectively with former being the parent + node. This config can take values from 0 to 4. 0 means there will be no + split. If the value is outside this range, it will be treated as 0 (i.e. + no split). A value larger than 0 (up to 4) should be configured if you are + running a large number of applications, with long-lived delegation tokens + and state store operations (e.g. failover) are failing due to LenError in + Zookeeper. + yarn.resourcemanager.zk-delegation-token-node.split-index + 0 + + Specifies the maximum size of the data that can be stored in a znode. Value should be same or less than jute.maxbuffer configured diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStore.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStore.java index 35340e62a22..e8ed0b7ee65 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStore.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStore.java @@ -89,6 +89,8 @@ public abstract class RMStateStore extends AbstractService { @VisibleForTesting public static final String RM_APP_ROOT = "RMAppRoot"; protected static final String RM_DT_SECRET_MANAGER_ROOT = "RMDTSecretManagerRoot"; + protected static final String RM_DELEGATION_TOKENS_ROOT_ZNODE_NAME = + "RMDelegationTokensRoot"; protected static final String DELEGATION_KEY_PREFIX = "DelegationKey_"; protected static final String DELEGATION_TOKEN_PREFIX = "RMDelegationToken_"; protected static final String DELEGATION_TOKEN_SEQUENCE_NUMBER_PREFIX = diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/ZKRMStateStore.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/ZKRMStateStore.java index 473dfb9e7da..c4c482a3396 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/ZKRMStateStore.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/ZKRMStateStore.java @@ -118,6 +118,22 @@ import java.util.Set; * |--- RM_DT_SECRET_MANAGER_ROOT * |----- RM_DT_SEQUENTIAL_NUMBER_ZNODE_NAME * |----- RM_DELEGATION_TOKENS_ROOT_ZNODE_NAME + * | |----- 1 + * | | |----- (#TokenId barring last character) + * | | | |----- (#Last character of TokenId) + * | | .... + * | |----- 2 + * | | |----- (#TokenId barring last 2 characters) + * | | | |----- (#Last 2 characters of TokenId) + * | | .... + * | |----- 3 + * | | |----- (#TokenId barring last 3 characters) + * | | | |----- (#Last 3 characters of TokenId) + * | | .... + * | |----- 4 + * | | |----- (#TokenId barring last 4 characters) + * | | | |----- (#Last 4 characters of TokenId) + * | | .... * | |----- Token_1 * | |----- Token_2 * | .... @@ -147,6 +163,11 @@ import java.util.Set; * splitting it in 2 parts, depending on a configurable split index. This limits * the number of application znodes returned in a single call while loading * app state. + * + * Changes from 1.4 to 1.5 - Change the structure of delegation token znode by + * splitting it in 2 parts, depending on a configurable split index. This limits + * the number of delegation token znodes returned in a single call while loading + * tokens state. */ @Private @Unstable @@ -162,7 +183,7 @@ public class ZKRMStateStore extends RMStateStore { @VisibleForTesting public static final String ROOT_ZNODE_NAME = "ZKRMStateRoot"; protected static final Version CURRENT_VERSION_INFO = Version - .newInstance(1, 4); + .newInstance(1, 5); @VisibleForTesting public static final String RM_APP_ROOT_HIERARCHIES = "HIERARCHIES"; @@ -170,6 +191,7 @@ public class ZKRMStateStore extends RMStateStore { private String zkRootNodePath; private String rmAppRoot; private Map rmAppRootHierarchies; + private Map rmDelegationTokenHierarchies; private String rmDTSecretManagerRoot; private String dtMasterKeysRootPath; private String delegationTokensRootPath; @@ -180,6 +202,8 @@ public class ZKRMStateStore extends RMStateStore { @VisibleForTesting protected String znodeWorkingPath; private int appIdNodeSplitIndex = 0; + @VisibleForTesting + protected int delegationTokenNodeSplitIndex = 0; /* Fencing related variables */ private static final String FENCING_LOCK = "RM_ZK_FENCING_LOCK"; @@ -212,12 +236,13 @@ public class ZKRMStateStore extends RMStateStore { }; /** - * Encapsulates full app node path and corresponding split index. + * Encapsulates znode path and corresponding split index for hierarchical + * znode layouts. */ - private final static class AppNodeSplitInfo { + private final static class ZnodeSplitInfo { private final String path; private final int splitIndex; - AppNodeSplitInfo(String path, int splitIndex) { + ZnodeSplitInfo(String path, int splitIndex) { this.path = path; this.splitIndex = splitIndex; } @@ -288,7 +313,7 @@ public class ZKRMStateStore extends RMStateStore { appIdNodeSplitIndex = conf.getInt(YarnConfiguration.ZK_APPID_NODE_SPLIT_INDEX, YarnConfiguration.DEFAULT_ZK_APPID_NODE_SPLIT_INDEX); - if (appIdNodeSplitIndex < 1 || appIdNodeSplitIndex > 4) { + if (appIdNodeSplitIndex < 0 || appIdNodeSplitIndex > 4) { LOG.info("Invalid value " + appIdNodeSplitIndex + " for config " + YarnConfiguration.ZK_APPID_NODE_SPLIT_INDEX + " specified. " + "Resetting it to " + @@ -322,12 +347,30 @@ public class ZKRMStateStore extends RMStateStore { RM_DT_MASTER_KEYS_ROOT_ZNODE_NAME); delegationTokensRootPath = getNodePath(rmDTSecretManagerRoot, RM_DELEGATION_TOKENS_ROOT_ZNODE_NAME); + rmDelegationTokenHierarchies = new HashMap<>(5); + rmDelegationTokenHierarchies.put(0, delegationTokensRootPath); + for (int splitIndex = 1; splitIndex <= 4; splitIndex++) { + rmDelegationTokenHierarchies.put(splitIndex, + getNodePath(delegationTokensRootPath, Integer.toString(splitIndex))); + } dtSequenceNumberPath = getNodePath(rmDTSecretManagerRoot, RM_DT_SEQUENTIAL_NUMBER_ZNODE_NAME); amrmTokenSecretManagerRoot = getNodePath(zkRootNodePath, AMRMTOKEN_SECRET_MANAGER_ROOT); reservationRoot = getNodePath(zkRootNodePath, RESERVATION_SYSTEM_ROOT); zkManager = resourceManager.getAndStartZKManager(conf); + delegationTokenNodeSplitIndex = + conf.getInt(YarnConfiguration.ZK_DELEGATION_TOKEN_NODE_SPLIT_INDEX, + YarnConfiguration.DEFAULT_ZK_DELEGATION_TOKEN_NODE_SPLIT_INDEX); + if (delegationTokenNodeSplitIndex < 0 + || delegationTokenNodeSplitIndex > 4) { + LOG.info("Invalid value " + delegationTokenNodeSplitIndex + " for config " + + YarnConfiguration.ZK_DELEGATION_TOKEN_NODE_SPLIT_INDEX + + " specified. Resetting it to " + + YarnConfiguration.DEFAULT_ZK_DELEGATION_TOKEN_NODE_SPLIT_INDEX); + delegationTokenNodeSplitIndex = + YarnConfiguration.DEFAULT_ZK_DELEGATION_TOKEN_NODE_SPLIT_INDEX; + } } @Override @@ -350,6 +393,9 @@ public class ZKRMStateStore extends RMStateStore { create(rmDTSecretManagerRoot); create(dtMasterKeysRootPath); create(delegationTokensRootPath); + for (int splitIndex = 1; splitIndex <= 4; splitIndex++) { + create(rmDelegationTokenHierarchies.get(splitIndex)); + } create(dtSequenceNumberPath); create(amrmTokenSecretManagerRoot); create(reservationRoot); @@ -573,36 +619,63 @@ public class ZKRMStateStore extends RMStateStore { } private void loadRMDelegationTokenState(RMState rmState) throws Exception { - List childNodes = - getChildren(delegationTokensRootPath); - - for (String childNodeName : childNodes) { - String childNodePath = - getNodePath(delegationTokensRootPath, childNodeName); - byte[] childData = getData(childNodePath); - - if (childData == null) { - LOG.warn("Content of " + childNodePath + " is broken."); + for (int splitIndex = 0; splitIndex <= 4; splitIndex++) { + String tokenRoot = rmDelegationTokenHierarchies.get(splitIndex); + if (tokenRoot == null) { continue; } - - ByteArrayInputStream is = new ByteArrayInputStream(childData); - - try (DataInputStream fsIn = new DataInputStream(is)) { + List childNodes = getChildren(tokenRoot); + boolean dtNodeFound = false; + for (String childNodeName : childNodes) { if (childNodeName.startsWith(DELEGATION_TOKEN_PREFIX)) { - RMDelegationTokenIdentifierData identifierData = - new RMDelegationTokenIdentifierData(); - identifierData.readFields(fsIn); - RMDelegationTokenIdentifier identifier = - identifierData.getTokenIdentifier(); - long renewDate = identifierData.getRenewDate(); - rmState.rmSecretManagerState.delegationTokenState.put(identifier, - renewDate); - - if (LOG.isDebugEnabled()) { - LOG.debug("Loaded RMDelegationTokenIdentifier: " + identifier - + " renewDate=" + renewDate); + dtNodeFound = true; + String parentNodePath = getNodePath(tokenRoot, childNodeName); + if (splitIndex == 0) { + loadDelegationTokenFromNode(rmState, parentNodePath); + } else { + // If znode is partitioned. + List leafNodes = getChildren(parentNodePath); + for (String leafNodeName : leafNodes) { + loadDelegationTokenFromNode(rmState, + getNodePath(parentNodePath, leafNodeName)); + } } + } else if (splitIndex == 0 + && !(childNodeName.equals("1") || childNodeName.equals("2") + || childNodeName.equals("3") || childNodeName.equals("4"))) { + LOG.debug("Unknown child node with name " + childNodeName + " under" + + tokenRoot); + } + } + if (splitIndex != delegationTokenNodeSplitIndex && !dtNodeFound) { + // If no loaded delegation token exists for a particular split index and + // the split index for which tokens are being loaded is not the one + // configured, then we do not need to keep track of this hierarchy for + // storing/updating/removing delegation token znodes. + rmDelegationTokenHierarchies.remove(splitIndex); + } + } + } + + private void loadDelegationTokenFromNode(RMState rmState, String path) + throws Exception { + byte[] data = getData(path); + if (data == null) { + LOG.warn("Content of " + path + " is broken."); + } else { + ByteArrayInputStream is = new ByteArrayInputStream(data); + try (DataInputStream fsIn = new DataInputStream(is)) { + RMDelegationTokenIdentifierData identifierData = + new RMDelegationTokenIdentifierData(); + identifierData.readFields(fsIn); + RMDelegationTokenIdentifier identifier = + identifierData.getTokenIdentifier(); + long renewDate = identifierData.getRenewDate(); + rmState.rmSecretManagerState.delegationTokenState.put(identifier, + renewDate); + if (LOG.isDebugEnabled()) { + LOG.debug("Loaded RMDelegationTokenIdentifier: " + identifier + + " renewDate=" + renewDate); } } } @@ -650,8 +723,9 @@ public class ZKRMStateStore extends RMStateStore { getNodePath(parentNodePath, leafNodeName), appIdStr); } } - } else { - LOG.info("Unknown child node with name: " + childNodeName); + } else if (!childNodeName.equals(RM_APP_ROOT_HIERARCHIES)){ + LOG.debug("Unknown child node with name " + childNodeName + " under" + + appRoot); } } if (splitIndex != appIdNodeSplitIndex && !appNodeFound) { @@ -684,36 +758,36 @@ public class ZKRMStateStore extends RMStateStore { } /** - * Get parent app node path based on full path and split index supplied. - * @param appIdPath App id path for which parent needs to be returned. + * Get znode path based on full path and split index supplied. + * @param path path for which parent needs to be returned. * @param splitIndex split index. * @return parent app node path. */ - private String getSplitAppNodeParent(String appIdPath, int splitIndex) { - // Calculated as string upto index (appIdPath Length - split index - 1). We + private String getSplitZnodeParent(String path, int splitIndex) { + // Calculated as string up to index (path Length - split index - 1). We // deduct 1 to exclude path separator. - return appIdPath.substring(0, appIdPath.length() - splitIndex - 1); + return path.substring(0, path.length() - splitIndex - 1); } /** - * Checks if parent app node has no leaf nodes and if it does not have, - * removes it. Called while removing application. - * @param appIdPath path of app id to be removed. + * Checks if parent znode has no leaf nodes and if it does not have, + * removes it. + * @param path path of znode to be removed. * @param splitIndex split index. * @throws Exception if any problem occurs while performing ZK operation. */ - private void checkRemoveParentAppNode(String appIdPath, int splitIndex) + private void checkRemoveParentZnode(String path, int splitIndex) throws Exception { if (splitIndex != 0) { - String parentAppNode = getSplitAppNodeParent(appIdPath, splitIndex); + String parentZnode = getSplitZnodeParent(path, splitIndex); List children = null; try { - children = getChildren(parentAppNode); + children = getChildren(parentZnode); } catch (KeeperException.NoNodeException ke) { - // It should be fine to swallow this exception as the parent app node we + // It should be fine to swallow this exception as the parent znode we // intend to delete is already deleted. if (LOG.isDebugEnabled()) { - LOG.debug("Unable to remove app parent node " + parentAppNode + + LOG.debug("Unable to remove parent node " + parentZnode + " as it does not exist."); } return; @@ -721,16 +795,16 @@ public class ZKRMStateStore extends RMStateStore { // No apps stored under parent path. if (children != null && children.isEmpty()) { try { - zkManager.safeDelete(parentAppNode, zkAcl, fencingNodePath); + zkManager.safeDelete(parentZnode, zkAcl, fencingNodePath); if (LOG.isDebugEnabled()) { - LOG.debug("No leaf app node exists. Removing parent node " + - parentAppNode); + LOG.debug("No leaf znode exists. Removing parent node " + + parentZnode); } } catch (KeeperException.NotEmptyException ke) { - // It should be fine to swallow this exception as the parent app node + // It should be fine to swallow this exception as the parent znode // has to be deleted only if it has no children. And this node has. if (LOG.isDebugEnabled()) { - LOG.debug("Unable to remove app parent node " + parentAppNode + + LOG.debug("Unable to remove app parent node " + parentZnode + " as it has children."); } } @@ -771,7 +845,7 @@ public class ZKRMStateStore extends RMStateStore { // Look for paths based on other split indices if path as per split index // does not exist. if (!exists(nodeUpdatePath)) { - AppNodeSplitInfo alternatePathInfo = getAlternatePath(appId.toString()); + ZnodeSplitInfo alternatePathInfo = getAlternateAppPath(appId.toString()); if (alternatePathInfo != null) { nodeUpdatePath = alternatePathInfo.path; } else { @@ -779,7 +853,7 @@ public class ZKRMStateStore extends RMStateStore { pathExists = false; if (appIdNodeSplitIndex != 0) { String rootNode = - getSplitAppNodeParent(nodeUpdatePath, appIdNodeSplitIndex); + getSplitZnodeParent(nodeUpdatePath, appIdNodeSplitIndex); if (!exists(rootNode)) { zkManager.safeCreate(rootNode, null, zkAcl, CreateMode.PERSISTENT, zkAcl, fencingNodePath); @@ -820,7 +894,7 @@ public class ZKRMStateStore extends RMStateStore { String appDirPath = getLeafAppIdNodePath(appId, false); // Look for paths based on other split indices. if (!exists(appDirPath)) { - AppNodeSplitInfo alternatePathInfo = getAlternatePath(appId); + ZnodeSplitInfo alternatePathInfo = getAlternateAppPath(appId); if (alternatePathInfo == null) { if (operation == AppAttemptOp.REMOVE) { // Unexpected. Assume that app attempt has been deleted. @@ -919,7 +993,7 @@ public class ZKRMStateStore extends RMStateStore { // Look for paths based on other split indices if path as per configured // split index does not exist. if (!exists(appIdRemovePath)) { - AppNodeSplitInfo alternatePathInfo = getAlternatePath(removeAppId); + ZnodeSplitInfo alternatePathInfo = getAlternateAppPath(removeAppId); if (alternatePathInfo != null) { appIdRemovePath = alternatePathInfo.path; splitIndex = alternatePathInfo.splitIndex; @@ -947,24 +1021,60 @@ public class ZKRMStateStore extends RMStateStore { forPath(appIdRemovePath); } // Check if we should remove the parent app node as well. - checkRemoveParentAppNode(appIdRemovePath, splitIndex); + checkRemoveParentZnode(appIdRemovePath, splitIndex); } @Override protected synchronized void storeRMDelegationTokenState( RMDelegationTokenIdentifier rmDTIdentifier, Long renewDate) throws Exception { - SafeTransaction trx = zkManager.createTransaction(zkAcl, fencingNodePath); - addStoreOrUpdateOps(trx, rmDTIdentifier, renewDate, false); - trx.commit(); + String nodeCreatePath = getLeafDelegationTokenNodePath( + rmDTIdentifier.getSequenceNumber(), true); + if (LOG.isDebugEnabled()) { + LOG.debug("Storing " + DELEGATION_TOKEN_PREFIX + + rmDTIdentifier.getSequenceNumber()); + } + + RMDelegationTokenIdentifierData identifierData = + new RMDelegationTokenIdentifierData(rmDTIdentifier, renewDate); + ByteArrayOutputStream seqOs = new ByteArrayOutputStream(); + try (DataOutputStream seqOut = new DataOutputStream(seqOs)) { + SafeTransaction trx = zkManager.createTransaction(zkAcl, + fencingNodePath); + trx.create(nodeCreatePath, identifierData.toByteArray(), zkAcl, + CreateMode.PERSISTENT); + // Update Sequence number only while storing DT + seqOut.writeInt(rmDTIdentifier.getSequenceNumber()); + + if (LOG.isDebugEnabled()) { + LOG.debug("Storing " + dtSequenceNumberPath + ". SequenceNumber: " + + rmDTIdentifier.getSequenceNumber()); + } + + trx.setData(dtSequenceNumberPath, seqOs.toByteArray(), -1); + trx.commit(); + } } @Override protected synchronized void removeRMDelegationTokenState( RMDelegationTokenIdentifier rmDTIdentifier) throws Exception { - String nodeRemovePath = - getNodePath(delegationTokensRootPath, DELEGATION_TOKEN_PREFIX - + rmDTIdentifier.getSequenceNumber()); + String nodeRemovePath = getLeafDelegationTokenNodePath( + rmDTIdentifier.getSequenceNumber(), false); + int splitIndex = delegationTokenNodeSplitIndex; + // Look for paths based on other split indices if path as per configured + // split index does not exist. + if (!exists(nodeRemovePath)) { + ZnodeSplitInfo alternatePathInfo = + getAlternateDTPath(rmDTIdentifier.getSequenceNumber()); + if (alternatePathInfo != null) { + nodeRemovePath = alternatePathInfo.path; + splitIndex = alternatePathInfo.splitIndex; + } else { + // Alternate path not found so return. + return; + } + } if (LOG.isDebugEnabled()) { LOG.debug("Removing RMDelegationToken_" @@ -972,62 +1082,41 @@ public class ZKRMStateStore extends RMStateStore { } zkManager.safeDelete(nodeRemovePath, zkAcl, fencingNodePath); + + // Check if we should remove the parent app node as well. + checkRemoveParentZnode(nodeRemovePath, splitIndex); } @Override protected synchronized void updateRMDelegationTokenState( RMDelegationTokenIdentifier rmDTIdentifier, Long renewDate) throws Exception { - SafeTransaction trx = zkManager.createTransaction(zkAcl, fencingNodePath); - String nodeRemovePath = - getNodePath(delegationTokensRootPath, DELEGATION_TOKEN_PREFIX - + rmDTIdentifier.getSequenceNumber()); - - if (exists(nodeRemovePath)) { - // in case znode exists - addStoreOrUpdateOps(trx, rmDTIdentifier, renewDate, true); - } else { - // in case znode doesn't exist - addStoreOrUpdateOps(trx, rmDTIdentifier, renewDate, false); - if (LOG.isDebugEnabled()) { - LOG.debug("Attempted to update a non-existing znode " + nodeRemovePath); + String nodeUpdatePath = getLeafDelegationTokenNodePath( + rmDTIdentifier.getSequenceNumber(), false); + boolean pathExists = true; + // Look for paths based on other split indices if path as per split index + // does not exist. + if (!exists(nodeUpdatePath)) { + ZnodeSplitInfo alternatePathInfo = + getAlternateDTPath(rmDTIdentifier.getSequenceNumber()); + if (alternatePathInfo != null) { + nodeUpdatePath = alternatePathInfo.path; + } else { + pathExists = false; } } - trx.commit(); - } - - private void addStoreOrUpdateOps(SafeTransaction trx, - RMDelegationTokenIdentifier rmDTIdentifier, Long renewDate, - boolean isUpdate) throws Exception { - // store RM delegation token - String nodeCreatePath = getNodePath(delegationTokensRootPath, - DELEGATION_TOKEN_PREFIX + rmDTIdentifier.getSequenceNumber()); - RMDelegationTokenIdentifierData identifierData = - new RMDelegationTokenIdentifierData(rmDTIdentifier, renewDate); - ByteArrayOutputStream seqOs = new ByteArrayOutputStream(); - - try (DataOutputStream seqOut = new DataOutputStream(seqOs)) { - - if (isUpdate) { - if (LOG.isDebugEnabled()) { - LOG.debug("Updating RMDelegationToken_" - + rmDTIdentifier.getSequenceNumber()); - } - trx.setData(nodeCreatePath, identifierData.toByteArray(), -1); - } else { - trx.create(nodeCreatePath, identifierData.toByteArray(), zkAcl, - CreateMode.PERSISTENT); - // Update Sequence number only while storing DT - seqOut.writeInt(rmDTIdentifier.getSequenceNumber()); - - if (LOG.isDebugEnabled()) { - LOG.debug("Storing " + dtSequenceNumberPath + ". SequenceNumber: " - + rmDTIdentifier.getSequenceNumber()); - } - - trx.setData(dtSequenceNumberPath, seqOs.toByteArray(), -1); + if (pathExists) { + if (LOG.isDebugEnabled()) { + LOG.debug("Updating " + DELEGATION_TOKEN_PREFIX + + rmDTIdentifier.getSequenceNumber()); } + RMDelegationTokenIdentifierData identifierData = + new RMDelegationTokenIdentifierData(rmDTIdentifier, renewDate); + zkManager.safeSetData(nodeUpdatePath, identifierData.toByteArray(), -1, + zkAcl, fencingNodePath); + } else { + storeRMDelegationTokenState(rmDTIdentifier, renewDate); } } @@ -1157,19 +1246,19 @@ public class ZKRMStateStore extends RMStateStore { * Get alternate path for app id if path according to configured split index * does not exist. We look for path based on all possible split indices. * @param appId - * @return a {@link AppNodeSplitInfo} object containing the path and split + * @return a {@link ZnodeSplitInfo} object containing the path and split * index if it exists, null otherwise. * @throws Exception if any problem occurs while performing ZK operation. */ - private AppNodeSplitInfo getAlternatePath(String appId) throws Exception { + private ZnodeSplitInfo getAlternateAppPath(String appId) throws Exception { for (Map.Entry entry : rmAppRootHierarchies.entrySet()) { // Look for other paths int splitIndex = entry.getKey(); if (splitIndex != appIdNodeSplitIndex) { String alternatePath = - getLeafAppIdNodePath(appId, entry.getValue(), splitIndex, false); + getLeafZnodePath(appId, entry.getValue(), splitIndex, false); if (exists(alternatePath)) { - return new AppNodeSplitInfo(alternatePath, splitIndex); + return new ZnodeSplitInfo(alternatePath, splitIndex); } } } @@ -1177,26 +1266,25 @@ public class ZKRMStateStore extends RMStateStore { } /** - * Returns leaf app node path based on app id and passed split index. If the - * passed flag createParentIfNotExists is true, also creates the parent app - * node if it does not exist. - * @param appId application id. + * Returns leaf znode path based on node name and passed split index. If the + * passed flag createParentIfNotExists is true, also creates the parent znode + * if it does not exist. + * @param nodeName the node name. * @param rootNode app root node based on split index. - * @param appIdNodeSplitIdx split index. - * @param createParentIfNotExists flag which determines if parent app node + * @param splitIdx split index. + * @param createParentIfNotExists flag which determines if parent znode * needs to be created(as per split) if it does not exist. - * @return leaf app node path. + * @return leaf znode path. * @throws Exception if any problem occurs while performing ZK operation. */ - private String getLeafAppIdNodePath(String appId, String rootNode, - int appIdNodeSplitIdx, boolean createParentIfNotExists) throws Exception { - if (appIdNodeSplitIdx == 0) { - return getNodePath(rootNode, appId); + private String getLeafZnodePath(String nodeName, String rootNode, + int splitIdx, boolean createParentIfNotExists) throws Exception { + if (splitIdx == 0) { + return getNodePath(rootNode, nodeName); } - String nodeName = appId; - int splitIdx = nodeName.length() - appIdNodeSplitIdx; + int split = nodeName.length() - splitIdx; String rootNodePath = - getNodePath(rootNode, nodeName.substring(0, splitIdx)); + getNodePath(rootNode, nodeName.substring(0, split)); if (createParentIfNotExists && !exists(rootNodePath)) { try { zkManager.safeCreate(rootNodePath, null, zkAcl, CreateMode.PERSISTENT, @@ -1208,7 +1296,7 @@ public class ZKRMStateStore extends RMStateStore { } } } - return getNodePath(rootNodePath, nodeName.substring(splitIdx)); + return getNodePath(rootNodePath, nodeName.substring(split)); } /** @@ -1223,10 +1311,77 @@ public class ZKRMStateStore extends RMStateStore { */ private String getLeafAppIdNodePath(String appId, boolean createParentIfNotExists) throws Exception { - return getLeafAppIdNodePath(appId, rmAppRootHierarchies.get( + return getLeafZnodePath(appId, rmAppRootHierarchies.get( appIdNodeSplitIndex), appIdNodeSplitIndex, createParentIfNotExists); } + /** + * Returns leaf delegation token node path based on sequence number and + * configured split index. If the passed flag createParentIfNotExists is true, + * also creates the parent znode if it does not exist. The sequence number + * is padded to be at least 4 digits wide to ensure consistency with the split + * indexing. + * @param rmDTSequenceNumber delegation token sequence number. + * @param createParentIfNotExists flag which determines if parent znode + * needs to be created(as per split) if it does not exist. + * @return leaf delegation token node path. + * @throws Exception if any problem occurs while performing ZK operation. + */ + private String getLeafDelegationTokenNodePath(int rmDTSequenceNumber, + boolean createParentIfNotExists) throws Exception { + return getLeafDelegationTokenNodePath(rmDTSequenceNumber, + createParentIfNotExists, delegationTokenNodeSplitIndex); + } + + /** + * Returns leaf delegation token node path based on sequence number and + * passed split index. If the passed flag createParentIfNotExists is true, + * also creates the parent znode if it does not exist. The sequence number + * is padded to be at least 4 digits wide to ensure consistency with the split + * indexing. + * @param rmDTSequenceNumber delegation token sequence number. + * @param createParentIfNotExists flag which determines if parent znode + * needs to be created(as per split) if it does not exist. + * @param split the split index to use + * @return leaf delegation token node path. + * @throws Exception if any problem occurs while performing ZK operation. + */ + private String getLeafDelegationTokenNodePath(int rmDTSequenceNumber, + boolean createParentIfNotExists, int split) throws Exception { + String nodeName = DELEGATION_TOKEN_PREFIX; + if (split == 0) { + nodeName += rmDTSequenceNumber; + } else { + nodeName += String.format("%04d", rmDTSequenceNumber); + } + return getLeafZnodePath(nodeName, rmDelegationTokenHierarchies.get(split), + split, createParentIfNotExists); + } + + /** + * Get alternate path for delegation token if path according to configured + * split index does not exist. We look for path based on all possible split + * indices. + * @param rmDTSequenceNumber delegation token sequence number. + * @return a {@link ZnodeSplitInfo} object containing the path and split + * index if it exists, null otherwise. + * @throws Exception if any problem occurs while performing ZK operation. + */ + private ZnodeSplitInfo getAlternateDTPath(int rmDTSequenceNumber) + throws Exception { + // Check all possible paths until we find it + for (int splitIndex : rmDelegationTokenHierarchies.keySet()) { + if (splitIndex != delegationTokenNodeSplitIndex) { + String alternatePath = getLeafDelegationTokenNodePath( + rmDTSequenceNumber, false, splitIndex); + if (exists(alternatePath)) { + return new ZnodeSplitInfo(alternatePath, splitIndex); + } + } + } + return null; + } + @VisibleForTesting byte[] getData(final String path) throws Exception { return zkManager.getData(path); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/TestZKRMStateStore.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/TestZKRMStateStore.java index adb40af663b..9126df8db53 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/TestZKRMStateStore.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/TestZKRMStateStore.java @@ -69,7 +69,6 @@ import org.apache.zookeeper.KeeperException; import org.apache.zookeeper.ZooDefs.Perms; import org.apache.zookeeper.data.ACL; import org.junit.After; -import org.junit.Assert; import org.junit.Before; import org.junit.Test; @@ -80,16 +79,20 @@ import com.google.common.collect.Lists; import static org.junit.Assert.assertArrayEquals; import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.spy; import static org.mockito.Mockito.when; import java.io.IOException; import java.util.HashMap; +import java.util.HashSet; import java.util.List; import java.util.Map; +import java.util.Set; import javax.crypto.SecretKey; @@ -133,10 +136,9 @@ public class TestZKRMStateStore extends RMStateStoreTestBase { TestZKRMStateStoreInternal store; String workingZnode; - class TestZKRMStateStoreInternal extends ZKRMStateStore { - public TestZKRMStateStoreInternal(Configuration conf, String workingZnode) + TestZKRMStateStoreInternal(Configuration conf, String workingZnode) throws Exception { setResourceManager(new ResourceManager()); init(conf); @@ -144,7 +146,7 @@ public class TestZKRMStateStore extends RMStateStoreTestBase { assertTrue(znodeWorkingPath.equals(workingZnode)); } - public String getVersionNode() { + private String getVersionNode() { return znodeWorkingPath + "/" + ROOT_ZNODE_NAME + "/" + VERSION_NODE; } @@ -166,11 +168,11 @@ public class TestZKRMStateStore extends RMStateStoreTestBase { return rootPath + "/" + appPath; } - public String getAppNode(String appId) { + private String getAppNode(String appId) { return getAppNode(appId, 0); } - public String getAttemptNode(String appId, String attemptId) { + private String getAttemptNode(String appId, String attemptId) { return getAppNode(appId) + "/" + attemptId; } @@ -178,10 +180,28 @@ public class TestZKRMStateStore extends RMStateStoreTestBase { * Emulating retrying createRootDir not to raise NodeExist exception * @throws Exception */ - public void testRetryingCreateRootDir() throws Exception { + private void testRetryingCreateRootDir() throws Exception { create(znodeWorkingPath); } + private String getDelegationTokenNode(int rmDTSequenceNumber, int splitIdx) { + String rootPath = workingZnode + "/" + ROOT_ZNODE_NAME + "/" + + RM_DT_SECRET_MANAGER_ROOT + "/" + + RMStateStore.RM_DELEGATION_TOKENS_ROOT_ZNODE_NAME; + String nodeName = DELEGATION_TOKEN_PREFIX; + if (splitIdx == 0) { + nodeName += rmDTSequenceNumber; + } else { + nodeName += String.format("%04d", rmDTSequenceNumber); + } + String path = nodeName; + if (splitIdx != 0) { + int idx = nodeName.length() - splitIdx; + path = splitIdx + "/" + nodeName.substring(0, idx) + "/" + + nodeName.substring(idx); + } + return rootPath + "/" + path; + } } private RMStateStore createStore(Configuration conf) throws Exception { @@ -239,6 +259,17 @@ public class TestZKRMStateStore extends RMStateStoreTestBase { .forPath(store.getAttemptNode( attemptId.getApplicationId().toString(), attemptId.toString())); } + + public boolean delegationTokenExists(RMDelegationTokenIdentifier token, + int index) throws Exception { + int rmDTSequenceNumber = token.getSequenceNumber(); + return curatorFramework.checkExists().forPath( + store.getDelegationTokenNode(rmDTSequenceNumber, index)) != null; + } + + public int getDelegationTokenNodeSplitIndex() { + return store.delegationTokenNodeSplitIndex; + } } @Test (timeout = 60000) @@ -336,7 +367,8 @@ public class TestZKRMStateStore extends RMStateStoreTestBase { RMStateStore store = zkTester.getRMStateStore(); Version defaultVersion = zkTester.getCurrentVersion(); store.checkVersion(); - Assert.assertEquals(defaultVersion, store.loadVersion()); + assertEquals("Store had wrong version", + defaultVersion, store.loadVersion()); } public static Configuration createHARMConf(String rmIds, String rmId, @@ -550,11 +582,20 @@ public class TestZKRMStateStore extends RMStateStoreTestBase { new Text("renewer1"), new Text("realuser1")); Long renewDate1 = new Long(System.currentTimeMillis()); dtId1.setSequenceNumber(1111); + assertFalse("Token " + dtId1 + + " should not exist but was found in ZooKeeper", + zkTester.delegationTokenExists(dtId1, 0)); store.storeRMDelegationToken(dtId1, renewDate1); + assertFalse("Token " + dtId1 + + " should not exist but was found in ZooKeeper", + zkTester.delegationTokenExists(dtId1, 0)); assertEquals("RMStateStore should have been in fenced state", true, store.isFencedState()); store.updateRMDelegationToken(dtId1, renewDate1); + assertFalse("Token " + dtId1 + + " should not exist but was found in ZooKeeper", + zkTester.delegationTokenExists(dtId1, 0)); assertEquals("RMStateStore should have been in fenced state", true, store.isFencedState()); @@ -610,7 +651,7 @@ public class TestZKRMStateStore extends RMStateStoreTestBase { try { store.removeApplicationStateInternal(appStateRemoved); } catch (KeeperException.NoNodeException nne) { - Assert.fail("NoNodeException should not happen."); + fail("NoNodeException should not happen."); } store.close(); } @@ -1128,4 +1169,317 @@ public class TestZKRMStateStore extends RMStateStoreTestBase { // Close the state store. store.close(); } + + private static Configuration createConfForDelegationTokenNodeSplit( + int splitIndex) { + YarnConfiguration conf = new YarnConfiguration(); + conf.setInt(YarnConfiguration.ZK_DELEGATION_TOKEN_NODE_SPLIT_INDEX, + splitIndex); + return conf; + } + + private void verifyDelegationTokensStateStore( + TestZKRMStateStoreTester zkTester, + Map tokensWithRenewal, + Map tokensWithIndex, + int sequenceNumber) throws Exception { + RMStateStore.RMDTSecretManagerState secretManagerState = + zkTester.store.loadState().getRMDTSecretManagerState(); + assertEquals("Unexpected token state", + tokensWithRenewal, secretManagerState.getTokenState()); + assertEquals("Unexpected sequence number", sequenceNumber, + secretManagerState.getDTSequenceNumber()); + for (Map.Entry tokenEntry + : tokensWithIndex.entrySet()) { + assertTrue("Expected to find token " + tokenEntry.getKey() + + " in zookeeper but did not", + zkTester.delegationTokenExists(tokenEntry.getKey(), + tokenEntry.getValue())); + } + } + + private void verifyDelegationTokenInStateStore( + TestZKRMStateStoreTester zkTester, RMDelegationTokenIdentifier token, + long renewDate, int index) throws Exception { + RMStateStore.RMDTSecretManagerState secretManagerState = + zkTester.store.loadState().getRMDTSecretManagerState(); + Map tokenState = + secretManagerState.getTokenState(); + assertTrue("token state does not contain " + token, + tokenState.containsKey(token)); + assertTrue("token state does not contain a token with renewal " + renewDate, + tokenState.containsValue(renewDate)); + assertTrue("Token " + token + "should exist but was not found in ZooKeeper", + zkTester.delegationTokenExists(token, index)); + } + + private RMDelegationTokenIdentifier storeUpdateAndVerifyDelegationToken( + TestZKRMStateStoreTester zkTester, + Map tokensWithRenewal, + Map tokensWithIndex, + int sequenceNumber, int split) throws Exception { + // Store token + RMDelegationTokenIdentifier token = + new RMDelegationTokenIdentifier(new Text("owner"), + new Text("renewer"), new Text("realuser")); + assertFalse("Token should not exist but was found in ZooKeeper", + zkTester.delegationTokenExists(token, split)); + token.setSequenceNumber(sequenceNumber); + Long renewDate = System.currentTimeMillis(); + zkTester.store.storeRMDelegationToken(token, renewDate); + modifyRMDelegationTokenState(); + tokensWithRenewal.put(token, renewDate); + tokensWithIndex.put(token, split); + + // Verify the token + verifyDelegationTokensStateStore(zkTester, tokensWithRenewal, + tokensWithIndex, sequenceNumber); + + // Update the token + renewDate = System.currentTimeMillis(); + zkTester.store.updateRMDelegationToken(token, renewDate); + tokensWithRenewal.put(token, renewDate); + tokensWithIndex.put(token, split); + + // Verify updates + verifyDelegationTokensStateStore(zkTester, tokensWithRenewal, + tokensWithIndex, sequenceNumber); + + return token; + } + + @Test + public void testDelegationTokenSplitIndexConfig() throws Exception { + // Valid values + TestZKRMStateStoreTester zkTester = new TestZKRMStateStoreTester(); + zkTester.getRMStateStore(createConfForDelegationTokenNodeSplit(0)).close(); + assertEquals("Incorrect split index", + 0, zkTester.getDelegationTokenNodeSplitIndex()); + zkTester.getRMStateStore(createConfForDelegationTokenNodeSplit(1)).close(); + assertEquals("Incorrect split index", + 1, zkTester.getDelegationTokenNodeSplitIndex()); + zkTester.getRMStateStore(createConfForDelegationTokenNodeSplit(2)).close(); + assertEquals("Incorrect split index", + 2, zkTester.getDelegationTokenNodeSplitIndex()); + zkTester.getRMStateStore(createConfForDelegationTokenNodeSplit(3)).close(); + assertEquals("Incorrect split index", + 3, zkTester.getDelegationTokenNodeSplitIndex()); + zkTester.getRMStateStore(createConfForDelegationTokenNodeSplit(4)).close(); + assertEquals("Incorrect split index", + 4, zkTester.getDelegationTokenNodeSplitIndex()); + + // Invalid values --> override to 0 + zkTester.getRMStateStore(createConfForDelegationTokenNodeSplit(-1)).close(); + assertEquals("Incorrect split index", + 0, zkTester.getDelegationTokenNodeSplitIndex()); + zkTester.getRMStateStore(createConfForDelegationTokenNodeSplit(5)).close(); + assertEquals("Incorrect split index", + 0, zkTester.getDelegationTokenNodeSplitIndex()); + } + + @Test + public void testDelegationTokenNodeNoSplit() throws Exception { + testDelegationTokenNode(0); + } + + @Test + public void testDelegationTokenNodeWithSplitOne() throws Exception { + testDelegationTokenNode(1); + } + + @Test + public void testDelegationTokenNodeWithSplitTwo() throws Exception { + testDelegationTokenNode(2); + } + + @Test + public void testDelegationTokenNodeWithSplitThree() throws Exception { + testDelegationTokenNode(3); + } + + @Test + public void testDelegationTokenNodeWithSplitFour() throws Exception { + testDelegationTokenNode(4); + } + + public void testDelegationTokenNode(int split) throws Exception { + TestZKRMStateStoreTester zkTester = new TestZKRMStateStoreTester(); + Configuration conf = createConfForDelegationTokenNodeSplit(split); + RMStateStore store = zkTester.getRMStateStore(conf); + + // Store the token and verify + Map tokensWithRenewal = new HashMap<>(); + Map tokensWithIndex = new HashMap<>(); + int sequenceNumber = 0; + RMDelegationTokenIdentifier token = storeUpdateAndVerifyDelegationToken( + zkTester, tokensWithRenewal, tokensWithIndex, sequenceNumber, split); + + // Delete the token and verify + store.removeRMDelegationToken(token); + RMStateStore.RMDTSecretManagerState state = + store.loadState().getRMDTSecretManagerState(); + tokensWithRenewal.clear(); + tokensWithIndex.clear(); + assertEquals("Unexpected token state", + tokensWithRenewal, state.getTokenState()); + assertEquals("Unexpected sequence number", + sequenceNumber, state.getDTSequenceNumber()); + assertFalse("Token should not exist but was found in ZooKeeper", + zkTester.delegationTokenExists(token, split)); + store.close(); + } + + @Test + public void testDelegationTokenNodeWithSplitMultiple() throws Exception { + TestZKRMStateStoreTester zkTester = new TestZKRMStateStoreTester(); + Configuration conf = createConfForDelegationTokenNodeSplit(1); + RMStateStore store = zkTester.getRMStateStore(conf); + + // With the split set to 1, we can store 10 tokens under a znode (i.e. 0-9) + // Try to store more than 10 + Map tokensWithRenewal = new HashMap<>(); + Map tokensWithIndex = new HashMap<>(); + Set tokensToDelete = new HashSet<>(); + int sequenceNumber = 0; + for (int i = 0; i <= 12; i++) { + RMDelegationTokenIdentifier token = + new RMDelegationTokenIdentifier(new Text("owner" + i), + new Text("renewer" + i), new Text("realuser" + i)); + sequenceNumber = i; + token.setSequenceNumber(sequenceNumber); + assertFalse("Token should not exist but was found in ZooKeeper", + zkTester.delegationTokenExists(token, 1)); + Long renewDate = System.currentTimeMillis(); + store.storeRMDelegationToken(token, renewDate); + modifyRMDelegationTokenState(); + tokensWithRenewal.put(token, renewDate); + tokensWithIndex.put(token, 1); + switch (i) { + case 0: + case 3: + case 6: + case 11: + tokensToDelete.add(token); + break; + default: + break; + } + } + // Verify + verifyDelegationTokensStateStore(zkTester, tokensWithRenewal, + tokensWithIndex, sequenceNumber); + + // Try deleting some tokens and adding some new ones + for (RMDelegationTokenIdentifier tokenToDelete : tokensToDelete) { + store.removeRMDelegationToken(tokenToDelete); + tokensWithRenewal.remove(tokenToDelete); + tokensWithIndex.remove(tokenToDelete); + } + for (int i = 13; i <= 22; i++) { + RMDelegationTokenIdentifier token = + new RMDelegationTokenIdentifier(new Text("owner" + i), + new Text("renewer" + i), new Text("realuser" + i)); + sequenceNumber = i; + token.setSequenceNumber(sequenceNumber); + Long renewDate = System.currentTimeMillis(); + store.storeRMDelegationToken(token, renewDate); + modifyRMDelegationTokenState(); + tokensWithRenewal.put(token, renewDate); + tokensWithIndex.put(token, 1); + } + // Verify + verifyDelegationTokensStateStore(zkTester, tokensWithRenewal, + tokensWithIndex, sequenceNumber); + for (RMDelegationTokenIdentifier token : tokensToDelete) { + assertFalse("Token " + token + + " should not exist but was found in ZooKeeper", + zkTester.delegationTokenExists(token, 1)); + } + store.close(); + } + + @Test + public void testDelegationTokenNodeWithSplitChangeAcrossRestarts() + throws Exception { + TestZKRMStateStoreTester zkTester = new TestZKRMStateStoreTester(); + Map tokensWithRenewal = new HashMap<>(); + Map tokensWithIndex = new HashMap<>(); + int sequenceNumber = 0; + + // Start the store with index 1 + Configuration conf = createConfForDelegationTokenNodeSplit(1); + RMStateStore store = zkTester.getRMStateStore(conf); + // Store a token with index 1 + RMDelegationTokenIdentifier token1 = storeUpdateAndVerifyDelegationToken( + zkTester, tokensWithRenewal, tokensWithIndex, sequenceNumber, 1); + store.close(); + + // Start the store with index 2 + conf = createConfForDelegationTokenNodeSplit(2); + store = zkTester.getRMStateStore(conf); + // Verify token1 is still there and under the /1/ znode + verifyDelegationTokenInStateStore( + zkTester, token1, tokensWithRenewal.get(token1), 1); + // Store a token with index 2 + sequenceNumber++; + RMDelegationTokenIdentifier token2 = storeUpdateAndVerifyDelegationToken( + zkTester, tokensWithRenewal, tokensWithIndex, sequenceNumber, 2); + // Update and verify token1 + long renewDate1 = System.currentTimeMillis(); + zkTester.store.updateRMDelegationToken(token1, renewDate1); + tokensWithRenewal.put(token1, renewDate1); + verifyDelegationTokenInStateStore( + zkTester, token1, tokensWithRenewal.get(token1), 1); + store.close(); + + // Start the store with index 0 + conf = createConfForDelegationTokenNodeSplit(0); + store = zkTester.getRMStateStore(conf); + // Verify token1 is still there and under the /1/ znode + verifyDelegationTokenInStateStore( + zkTester, token1, tokensWithRenewal.get(token1), 1); + // Verify token2 is still there and under the /2/ znode + verifyDelegationTokenInStateStore( + zkTester, token2, tokensWithRenewal.get(token2), 2); + // Store a token with no index + sequenceNumber++; + RMDelegationTokenIdentifier token0 = storeUpdateAndVerifyDelegationToken( + zkTester, tokensWithRenewal, tokensWithIndex, sequenceNumber, 0); + store.close(); + + // Start the store with index 3 + conf = createConfForDelegationTokenNodeSplit(3); + store = zkTester.getRMStateStore(conf); + // Verify token1 is still there and under the /1/ znode + verifyDelegationTokenInStateStore( + zkTester, token1, tokensWithRenewal.get(token1), 1); + // Verify token2 is still there and under the /2/ znode + verifyDelegationTokenInStateStore( + zkTester, token2, tokensWithRenewal.get(token2), 2); + // Verify token0 is still there and under the token root node + verifyDelegationTokenInStateStore( + zkTester, token0, tokensWithRenewal.get(token0), 0); + // Delete all tokens and verify + for (RMDelegationTokenIdentifier token : tokensWithRenewal.keySet()) { + store.removeRMDelegationToken(token); + } + tokensWithRenewal.clear(); + tokensWithIndex.clear(); + verifyDelegationTokensStateStore( + zkTester, tokensWithRenewal, tokensWithIndex, sequenceNumber); + assertFalse("Token " + token1 + + " should not exist but was found in ZooKeeper", + zkTester.delegationTokenExists(token1, 1)); + assertFalse("Token " + token1 + + " should not exist but was found in ZooKeeper", + zkTester.delegationTokenExists(token2, 2)); + assertFalse("Token " + token1 + + " should not exist but was found in ZooKeeper", + zkTester.delegationTokenExists(token0, 0)); + // Store a token with index 3 + sequenceNumber++; + storeUpdateAndVerifyDelegationToken(zkTester, tokensWithRenewal, + tokensWithIndex, sequenceNumber, 3); + store.close(); + } }