From a8512d5aeb644fade2282b616f5ea3ea47546640 Mon Sep 17 00:00:00 2001 From: Daniel Templeton Date: Fri, 28 Apr 2017 13:26:36 -0700 Subject: [PATCH] YARN-2962. ZKRMStateStore: Limit the number of znodes under a znode (Contributed by Varun Sexena via Daniel Templeton) (cherry picked from commit 2e52789edf68016e7a3f450164f8bd3d8e6cb210) --- .../hadoop/yarn/conf/YarnConfiguration.java | 4 + .../src/main/resources/yarn-default.xml | 22 +- .../recovery/ZKRMStateStore.java | 506 ++++++++++++---- .../resourcemanager/TestRMStoreCommands.java | 15 +- .../recovery/RMStateStoreTestBase.java | 10 +- .../recovery/TestZKRMStateStore.java | 572 +++++++++++++++++- 6 files changed, 1013 insertions(+), 116 deletions(-) diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java index 3eeca0381db..dba9b90e58f 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java @@ -532,6 +532,10 @@ public class YarnConfiguration extends Configuration { public static final String RM_ZK_TIMEOUT_MS = RM_ZK_PREFIX + "timeout-ms"; public static final int DEFAULT_RM_ZK_TIMEOUT_MS = 10000; + public static final String ZK_APPID_NODE_SPLIT_INDEX = + RM_ZK_PREFIX + "appid-node.split-index"; + public static final int DEFAULT_ZK_APPID_NODE_SPLIT_INDEX = 0; + public static final String RM_ZK_ACL = RM_ZK_PREFIX + "acl"; public static final String DEFAULT_RM_ZK_ACL = "world:anyone:rwcda"; diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml index 3b869e6175c..72c9004e047 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml @@ -605,7 +605,27 @@ - Name of the cluster. In an HA setting, + Index at which last section of application id (with each section + separated by _ in application id) will be split so that application znode + stored in zookeeper RM state store will be stored as two different znodes + (parent-child). Split is done from the end. + For instance, with no split, appid znode will be of the form + application_1352994193343_0001. If the value of this config is 1, the + appid znode will be broken into two parts application_1352994193343_000 + and 1 respectively with former being the parent node. + application_1352994193343_0002 will then be stored as 2 under the parent + node application_1352994193343_000. This config can take values from 0 to 4. + 0 means there will be no split. If configuration value is outside this + range, it will be treated as config value of 0(i.e. no split). A value + larger than 0 (up to 4) should be configured if you are storing a large number + of apps in ZK based RM state store and state store operations are failing due to + LenError in Zookeeper. + yarn.resourcemanager.zk-appid-node.split-index + 0 + + + + Name of the cluster. In a HA setting, this is used to ensure the RM participates in leader election for this cluster and ensures it does not affect other clusters diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/ZKRMStateStore.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/ZKRMStateStore.java index b9863f270ec..10ed880f7f9 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/ZKRMStateStore.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/ZKRMStateStore.java @@ -57,6 +57,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.impl.pb.Ap import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.impl.pb.ApplicationStateDataPBImpl; import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.impl.pb.EpochPBImpl; import org.apache.zookeeper.CreateMode; +import org.apache.zookeeper.KeeperException; import org.apache.zookeeper.ZooDefs; import org.apache.zookeeper.data.ACL; import org.apache.zookeeper.data.Id; @@ -72,6 +73,8 @@ import java.security.NoSuchAlgorithmException; import java.util.ArrayList; import java.util.HashMap; import java.util.List; +import java.util.Map; +import java.util.Set; /** * {@link RMStateStore} implementation backed by ZooKeeper. @@ -82,6 +85,31 @@ import java.util.List; * |--- EPOCH_NODE * |--- RM_ZK_FENCING_LOCK * |--- RM_APP_ROOT + * | |----- HIERARCHIES + * | | |----- 1 + * | | | |----- (#ApplicationId barring last character) + * | | | | |----- (#Last character of ApplicationId) + * | | | | | |----- (#ApplicationAttemptIds) + * | | | .... + * | | | + * | | |----- 2 + * | | | |----- (#ApplicationId barring last 2 characters) + * | | | | |----- (#Last 2 characters of ApplicationId) + * | | | | | |----- (#ApplicationAttemptIds) + * | | | .... + * | | | + * | | |----- 3 + * | | | |----- (#ApplicationId barring last 3 characters) + * | | | | |----- (#Last 3 characters of ApplicationId) + * | | | | | |----- (#ApplicationAttemptIds) + * | | | .... + * | | | + * | | |----- 4 + * | | | |----- (#ApplicationId barring last 4 characters) + * | | | | |----- (#Last 4 characters of ApplicationId) + * | | | | | |----- (#ApplicationAttemptIds) + * | | | .... + * | | | * | |----- (#ApplicationId1) * | | |----- (#ApplicationAttemptIds) * | | @@ -121,6 +149,7 @@ import java.util.List; @Unstable public class ZKRMStateStore extends RMStateStore { private static final Log LOG = LogFactory.getLog(ZKRMStateStore.class); + private static final String RM_DELEGATION_TOKENS_ROOT_ZNODE_NAME = "RMDelegationTokensRoot"; private static final String RM_DT_SEQUENTIAL_NUMBER_ZNODE_NAME = @@ -129,12 +158,15 @@ public class ZKRMStateStore extends RMStateStore { "RMDTMasterKeysRoot"; @VisibleForTesting public static final String ROOT_ZNODE_NAME = "ZKRMStateRoot"; - protected static final Version CURRENT_VERSION_INFO = - Version.newInstance(1, 3); + protected static final Version CURRENT_VERSION_INFO = Version + .newInstance(2, 0); + @VisibleForTesting + public static final String RM_APP_ROOT_HIERARCHIES = "HIERARCHIES"; /* Znode paths */ private String zkRootNodePath; private String rmAppRoot; + private Map rmAppRootHierarchies; private String rmDTSecretManagerRoot; private String dtMasterKeysRootPath; private String delegationTokensRootPath; @@ -144,6 +176,7 @@ public class ZKRMStateStore extends RMStateStore { @VisibleForTesting protected String znodeWorkingPath; + private int appIdNodeSplitIndex = 0; /* Fencing related variables */ private static final String FENCING_LOCK = "RM_ZK_FENCING_LOCK"; @@ -165,6 +198,27 @@ public class ZKRMStateStore extends RMStateStore { @VisibleForTesting protected CuratorFramework curatorFramework; + /* + * Indicates different app attempt state store operations. + */ + private enum AppAttemptOp { + STORE, + UPDATE, + REMOVE + }; + + /** + * Encapsulates full app node path and corresponding split index. + */ + private final static class AppNodeSplitInfo { + private final String path; + private final int splitIndex; + AppNodeSplitInfo(String path, int splitIndex) { + this.path = path; + this.splitIndex = splitIndex; + } + } + /** * Given the {@link Configuration} and {@link ACL}s used (sourceACLs) for * ZooKeeper access, construct the {@link ACL}s for the store's root node. @@ -212,11 +266,30 @@ public class ZKRMStateStore extends RMStateStore { conf.get(YarnConfiguration.ZK_RM_STATE_STORE_PARENT_PATH, YarnConfiguration.DEFAULT_ZK_RM_STATE_STORE_PARENT_PATH); zkRootNodePath = getNodePath(znodeWorkingPath, ROOT_ZNODE_NAME); - fencingNodePath = getNodePath(zkRootNodePath, FENCING_LOCK); rmAppRoot = getNodePath(zkRootNodePath, RM_APP_ROOT); + String hierarchiesPath = getNodePath(rmAppRoot, RM_APP_ROOT_HIERARCHIES); + rmAppRootHierarchies = new HashMap<>(5); + rmAppRootHierarchies.put(0, rmAppRoot); + for (int splitIndex = 1; splitIndex <= 4; splitIndex++) { + rmAppRootHierarchies.put(splitIndex, + getNodePath(hierarchiesPath, Integer.toString(splitIndex))); + } + + fencingNodePath = getNodePath(zkRootNodePath, FENCING_LOCK); zkSessionTimeout = conf.getInt(YarnConfiguration.RM_ZK_TIMEOUT_MS, YarnConfiguration.DEFAULT_RM_ZK_TIMEOUT_MS); + appIdNodeSplitIndex = + conf.getInt(YarnConfiguration.ZK_APPID_NODE_SPLIT_INDEX, + YarnConfiguration.DEFAULT_ZK_APPID_NODE_SPLIT_INDEX); + if (appIdNodeSplitIndex < 1 || appIdNodeSplitIndex > 4) { + LOG.info("Invalid value " + appIdNodeSplitIndex + " for config " + + YarnConfiguration.ZK_APPID_NODE_SPLIT_INDEX + " specified. " + + "Resetting it to " + + YarnConfiguration.DEFAULT_ZK_APPID_NODE_SPLIT_INDEX); + appIdNodeSplitIndex = YarnConfiguration.DEFAULT_ZK_APPID_NODE_SPLIT_INDEX; + } + zkAcl = RMZKUtils.getZKAcls(conf); if (HAUtil.isHAEnabled(conf)) { @@ -269,6 +342,10 @@ public class ZKRMStateStore extends RMStateStore { verifyActiveStatusThread.start(); } create(rmAppRoot); + create(getNodePath(rmAppRoot, RM_APP_ROOT_HIERARCHIES)); + for (int splitIndex = 1; splitIndex <= 4; splitIndex++) { + create(rmAppRootHierarchies.get(splitIndex)); + } create(rmDTSecretManagerRoot); create(dtMasterKeysRootPath); create(delegationTokensRootPath); @@ -525,42 +602,64 @@ public class ZKRMStateStore extends RMStateStore { } } + private void loadRMAppStateFromAppNode(RMState rmState, String appNodePath, + String appIdStr) throws Exception { + byte[] appData = getData(appNodePath); + if (LOG.isDebugEnabled()) { + LOG.debug("Loading application from znode: " + appNodePath); + } + ApplicationId appId = ApplicationId.fromString(appIdStr); + ApplicationStateDataPBImpl appState = new ApplicationStateDataPBImpl( + ApplicationStateDataProto.parseFrom(appData)); + if (!appId.equals( + appState.getApplicationSubmissionContext().getApplicationId())) { + throw new YarnRuntimeException("The node name is different from the " + + "application id"); + } + rmState.appState.put(appId, appState); + loadApplicationAttemptState(appState, appNodePath); + } + private synchronized void loadRMAppState(RMState rmState) throws Exception { - List childNodes = getChildren(rmAppRoot); - - for (String childNodeName : childNodes) { - String childNodePath = getNodePath(rmAppRoot, childNodeName); - byte[] childData = getData(childNodePath); - - if (childNodeName.startsWith(ApplicationId.appIdStrPrefix)) { - // application - if (LOG.isDebugEnabled()) { - LOG.debug("Loading application from znode: " + childNodeName); + for (int splitIndex = 0; splitIndex <= 4; splitIndex++) { + String appRoot = rmAppRootHierarchies.get(splitIndex); + if (appRoot == null) { + continue; + } + List childNodes = getChildren(appRoot); + boolean appNodeFound = false; + for (String childNodeName : childNodes) { + if (childNodeName.startsWith(ApplicationId.appIdStrPrefix)) { + appNodeFound = true; + if (splitIndex == 0) { + loadRMAppStateFromAppNode(rmState, + getNodePath(appRoot, childNodeName), childNodeName); + } else { + // If AppId Node is partitioned. + String parentNodePath = getNodePath(appRoot, childNodeName); + List leafNodes = getChildren(parentNodePath); + for (String leafNodeName : leafNodes) { + String appIdStr = childNodeName + leafNodeName; + loadRMAppStateFromAppNode(rmState, + getNodePath(parentNodePath, leafNodeName), appIdStr); + } + } + } else { + LOG.info("Unknown child node with name: " + childNodeName); } - - ApplicationId appId = ApplicationId.fromString(childNodeName); - ApplicationStateDataPBImpl appState = - new ApplicationStateDataPBImpl( - ApplicationStateDataProto.parseFrom(childData)); - - if (!appId.equals( - appState.getApplicationSubmissionContext().getApplicationId())) { - throw new YarnRuntimeException("The child node name is different " - + "from the application id"); - } - - rmState.appState.put(appId, appState); - loadApplicationAttemptState(appState, appId); - } else { - LOG.info("Unknown child node with name: " + childNodeName); + } + if (splitIndex != appIdNodeSplitIndex && !appNodeFound) { + // If no loaded app exists for a particular split index and the split + // index for which apps are being loaded is not the one configured, then + // we do not need to keep track of this hierarchy for storing/updating/ + // removing app/app attempt znodes. + rmAppRootHierarchies.remove(splitIndex); } } } private void loadApplicationAttemptState(ApplicationStateData appState, - ApplicationId appId) - throws Exception { - String appPath = getNodePath(rmAppRoot, appId.toString()); + String appPath) throws Exception { List attempts = getChildren(appPath); for (String attemptIDStr : attempts) { @@ -575,14 +674,68 @@ public class ZKRMStateStore extends RMStateStore { appState.attempts.put(attemptState.getAttemptId(), attemptState); } } - LOG.debug("Done loading applications from ZK state store"); } + /** + * Get parent app node path based on full path and split index supplied. + * @param appIdPath App id path for which parent needs to be returned. + * @param splitIndex split index. + * @return parent app node path. + */ + private String getSplitAppNodeParent(String appIdPath, int splitIndex) { + // Calculated as string upto index (appIdPath Length - split index - 1). We + // deduct 1 to exclude path separator. + return appIdPath.substring(0, appIdPath.length() - splitIndex - 1); + } + + /** + * Checks if parent app node has no leaf nodes and if it does not have, + * removes it. Called while removing application. + * @param appIdPath path of app id to be removed. + * @param splitIndex split index. + * @throws Exception if any problem occurs while performing ZK operation. + */ + private void checkRemoveParentAppNode(String appIdPath, int splitIndex) + throws Exception { + if (splitIndex != 0) { + String parentAppNode = getSplitAppNodeParent(appIdPath, splitIndex); + List children = null; + try { + children = getChildren(parentAppNode); + } catch (KeeperException.NoNodeException ke) { + // It should be fine to swallow this exception as the parent app node we + // intend to delete is already deleted. + if (LOG.isDebugEnabled()) { + LOG.debug("Unable to remove app parent node " + parentAppNode + + " as it does not exist."); + } + return; + } + // No apps stored under parent path. + if (children != null && children.isEmpty()) { + try { + safeDelete(parentAppNode); + if (LOG.isDebugEnabled()) { + LOG.debug("No leaf app node exists. Removing parent node " + + parentAppNode); + } + } catch (KeeperException.NotEmptyException ke) { + // It should be fine to swallow this exception as the parent app node + // has to be deleted only if it has no children. And this node has. + if (LOG.isDebugEnabled()) { + LOG.debug("Unable to remove app parent node " + parentAppNode + + " as it has children."); + } + } + } + } + } + @Override public synchronized void storeApplicationStateInternal(ApplicationId appId, ApplicationStateData appStateDataPB) throws Exception { - String nodeCreatePath = getNodePath(rmAppRoot, appId.toString()); + String nodeCreatePath = getLeafAppIdNodePath(appId.toString(), true); if (LOG.isDebugEnabled()) { LOG.debug("Storing info for app: " + appId + " at: " + nodeCreatePath); @@ -597,7 +750,26 @@ public class ZKRMStateStore extends RMStateStore { protected synchronized void updateApplicationStateInternal( ApplicationId appId, ApplicationStateData appStateDataPB) throws Exception { - String nodeUpdatePath = getNodePath(rmAppRoot, appId.toString()); + String nodeUpdatePath = getLeafAppIdNodePath(appId.toString(), false); + boolean pathExists = true; + // Look for paths based on other split indices if path as per split index + // does not exist. + if (!exists(nodeUpdatePath)) { + AppNodeSplitInfo alternatePathInfo = getAlternatePath(appId.toString()); + if (alternatePathInfo != null) { + nodeUpdatePath = alternatePathInfo.path; + } else { + // No alternate path exists. Create path as per configured split index. + pathExists = false; + if (appIdNodeSplitIndex != 0) { + String rootNode = + getSplitAppNodeParent(nodeUpdatePath, appIdNodeSplitIndex); + if (!exists(rootNode)) { + safeCreate(rootNode, null, zkAcl, CreateMode.PERSISTENT); + } + } + } + } if (LOG.isDebugEnabled()) { LOG.debug("Storing final state info for app: " + appId + " at: " @@ -606,34 +778,79 @@ public class ZKRMStateStore extends RMStateStore { byte[] appStateData = appStateDataPB.getProto().toByteArray(); - if (exists(nodeUpdatePath)) { + if (pathExists) { safeSetData(nodeUpdatePath, appStateData, -1); } else { - safeCreate(nodeUpdatePath, appStateData, zkAcl, - CreateMode.PERSISTENT); + safeCreate(nodeUpdatePath, appStateData, zkAcl, CreateMode.PERSISTENT); if (LOG.isDebugEnabled()) { - LOG.debug(appId + " znode didn't exist. Created a new znode to" - + " update the application state."); + LOG.debug("Path " + nodeUpdatePath + " for " + appId + " didn't " + + "exist. Creating a new znode to update the application state."); } } } + /* + * Handles store, update and remove application attempt state store + * operations. + */ + private void handleApplicationAttemptStateOp( + ApplicationAttemptId appAttemptId, + ApplicationAttemptStateData attemptStateDataPB, AppAttemptOp operation) + throws Exception { + String appId = appAttemptId.getApplicationId().toString(); + String appDirPath = getLeafAppIdNodePath(appId, false); + // Look for paths based on other split indices. + if (!exists(appDirPath)) { + AppNodeSplitInfo alternatePathInfo = getAlternatePath(appId); + if (alternatePathInfo == null) { + if (operation == AppAttemptOp.REMOVE) { + // Unexpected. Assume that app attempt has been deleted. + return; + } else { // Store or Update operation + throw new YarnRuntimeException("Unexpected Exception. App node for " + + "app " + appId + " not found"); + } + } else { + appDirPath = alternatePathInfo.path; + } + } + String path = getNodePath(appDirPath, appAttemptId.toString()); + byte[] attemptStateData = (attemptStateDataPB == null) ? null : + attemptStateDataPB.getProto().toByteArray(); + if (LOG.isDebugEnabled()) { + LOG.debug(operation + " info for attempt: " + appAttemptId + " at: " + + path); + } + switch (operation) { + case UPDATE: + if (exists(path)) { + safeSetData(path, attemptStateData, -1); + } else { + safeCreate(path, attemptStateData, zkAcl, CreateMode.PERSISTENT); + if (LOG.isDebugEnabled()) { + LOG.debug("Path " + path + " for " + appAttemptId + " didn't exist." + + " Created a new znode to update the application attempt state."); + } + } + break; + case STORE: + safeCreate(path, attemptStateData, zkAcl, CreateMode.PERSISTENT); + break; + case REMOVE: + safeDelete(path); + break; + default: + break; + } + } + @Override protected synchronized void storeApplicationAttemptStateInternal( ApplicationAttemptId appAttemptId, ApplicationAttemptStateData attemptStateDataPB) throws Exception { - String appDirPath = getNodePath(rmAppRoot, - appAttemptId.getApplicationId().toString()); - String nodeCreatePath = getNodePath(appDirPath, appAttemptId.toString()); - - if (LOG.isDebugEnabled()) { - LOG.debug("Storing info for attempt: " + appAttemptId + " at: " - + nodeCreatePath); - } - - byte[] attemptStateData = attemptStateDataPB.getProto().toByteArray(); - safeCreate(nodeCreatePath, attemptStateData, zkAcl, CreateMode.PERSISTENT); + handleApplicationAttemptStateOp(appAttemptId, attemptStateDataPB, + AppAttemptOp.STORE); } @Override @@ -641,65 +858,73 @@ public class ZKRMStateStore extends RMStateStore { ApplicationAttemptId appAttemptId, ApplicationAttemptStateData attemptStateDataPB) throws Exception { - String appIdStr = appAttemptId.getApplicationId().toString(); - String appAttemptIdStr = appAttemptId.toString(); - String appDirPath = getNodePath(rmAppRoot, appIdStr); - String nodeUpdatePath = getNodePath(appDirPath, appAttemptIdStr); - - if (LOG.isDebugEnabled()) { - LOG.debug("Storing final state info for attempt: " + appAttemptIdStr - + " at: " + nodeUpdatePath); - } - - byte[] attemptStateData = attemptStateDataPB.getProto().toByteArray(); - - if (exists(nodeUpdatePath)) { - safeSetData(nodeUpdatePath, attemptStateData, -1); - } else { - safeCreate(nodeUpdatePath, attemptStateData, zkAcl, - CreateMode.PERSISTENT); - if (LOG.isDebugEnabled()) { - LOG.debug(appAttemptId + " znode didn't exist. Created a new znode to" - + " update the application attempt state."); - } - } + handleApplicationAttemptStateOp(appAttemptId, attemptStateDataPB, + AppAttemptOp.UPDATE); } @Override protected synchronized void removeApplicationAttemptInternal( ApplicationAttemptId appAttemptId) throws Exception { - String appId = appAttemptId.getApplicationId().toString(); - String appIdRemovePath = getNodePath(rmAppRoot, appId); - String attemptIdRemovePath = - getNodePath(appIdRemovePath, appAttemptId.toString()); - - if (LOG.isDebugEnabled()) { - LOG.debug("Removing info for attempt: " + appAttemptId + " at: " - + attemptIdRemovePath); - } - - safeDelete(attemptIdRemovePath); + handleApplicationAttemptStateOp(appAttemptId, null, AppAttemptOp.REMOVE); } @Override protected synchronized void removeApplicationStateInternal( ApplicationStateData appState) throws Exception { - String appId = appState.getApplicationSubmissionContext().getApplicationId() - .toString(); - String appIdRemovePath = getNodePath(rmAppRoot, appId); + removeApp(appState.getApplicationSubmissionContext(). + getApplicationId().toString(), true, appState.attempts.keySet()); + } - if (LOG.isDebugEnabled()) { - LOG.debug("Removing info for app: " + appId + " at: " + appIdRemovePath - + " and its attempts."); + private void removeApp(String removeAppId) throws Exception { + removeApp(removeAppId, false, null); + } + + /** + * Remove application node and its attempt nodes. + * + * @param removeAppId Application Id to be removed. + * @param safeRemove Flag indicating if application and attempt nodes have to + * be removed safely under a fencing or not. + * @param attempts list of attempts to be removed associated with this app. + * Ignored if safeRemove flag is false as we recursively delete all the + * child nodes directly. + * @throws Exception if any exception occurs during ZK operation. + */ + private void removeApp(String removeAppId, boolean safeRemove, + Set attempts) throws Exception { + String appIdRemovePath = getLeafAppIdNodePath(removeAppId, false); + int splitIndex = appIdNodeSplitIndex; + // Look for paths based on other split indices if path as per configured + // split index does not exist. + if (!exists(appIdRemovePath)) { + AppNodeSplitInfo alternatePathInfo = getAlternatePath(removeAppId); + if (alternatePathInfo != null) { + appIdRemovePath = alternatePathInfo.path; + splitIndex = alternatePathInfo.splitIndex; + } else { + // Alternate path not found so return. + return; + } } - - for (ApplicationAttemptId attemptId : appState.attempts.keySet()) { - String attemptRemovePath = - getNodePath(appIdRemovePath, attemptId.toString()); - safeDelete(attemptRemovePath); + if (safeRemove) { + if (LOG.isDebugEnabled()) { + LOG.debug("Removing info for app: " + removeAppId + " at: " + + appIdRemovePath + " and its attempts."); + } + if (attempts != null) { + for (ApplicationAttemptId attemptId : attempts) { + String attemptRemovePath = + getNodePath(appIdRemovePath, attemptId.toString()); + safeDelete(attemptRemovePath); + } + } + safeDelete(appIdRemovePath); + } else { + curatorFramework.delete().deletingChildrenIfNeeded(). + forPath(appIdRemovePath); } - - safeDelete(appIdRemovePath); + // Check if we should remove the parent app node as well. + checkRemoveParentAppNode(appIdRemovePath, splitIndex); } @Override @@ -821,8 +1046,7 @@ public class ZKRMStateStore extends RMStateStore { @Override public synchronized void removeApplication(ApplicationId removeAppId) throws Exception { - String appIdRemovePath = getNodePath(rmAppRoot, removeAppId.toString()); - delete(appIdRemovePath); + removeApp(removeAppId.toString()); } @VisibleForTesting @@ -921,6 +1145,79 @@ public class ZKRMStateStore extends RMStateStore { } } + /** + * Get alternate path for app id if path according to configured split index + * does not exist. We look for path based on all possible split indices. + * @param appId + * @return a {@link AppNodeSplitInfo} object containing the path and split + * index if it exists, null otherwise. + * @throws Exception if any problem occurs while performing ZK operation. + */ + private AppNodeSplitInfo getAlternatePath(String appId) throws Exception { + for (Map.Entry entry : rmAppRootHierarchies.entrySet()) { + // Look for other paths + int splitIndex = entry.getKey(); + if (splitIndex != appIdNodeSplitIndex) { + String alternatePath = + getLeafAppIdNodePath(appId, entry.getValue(), splitIndex, false); + if (exists(alternatePath)) { + return new AppNodeSplitInfo(alternatePath, splitIndex); + } + } + } + return null; + } + + /** + * Returns leaf app node path based on app id and passed split index. If the + * passed flag createParentIfNotExists is true, also creates the parent app + * node if it does not exist. + * @param appId application id. + * @param rootNode app root node based on split index. + * @param appIdNodeSplitIdx split index. + * @param createParentIfNotExists flag which determines if parent app node + * needs to be created(as per split) if it does not exist. + * @return leaf app node path. + * @throws Exception if any problem occurs while performing ZK operation. + */ + private String getLeafAppIdNodePath(String appId, String rootNode, + int appIdNodeSplitIdx, boolean createParentIfNotExists) throws Exception { + if (appIdNodeSplitIdx == 0) { + return getNodePath(rootNode, appId); + } + String nodeName = appId; + int splitIdx = nodeName.length() - appIdNodeSplitIdx; + String rootNodePath = + getNodePath(rootNode, nodeName.substring(0, splitIdx)); + if (createParentIfNotExists && !exists(rootNodePath)) { + try { + safeCreate(rootNodePath, null, zkAcl, CreateMode.PERSISTENT); + } catch (KeeperException.NodeExistsException e) { + if (LOG.isDebugEnabled()) { + LOG.debug("Unable to create app parent node " + rootNodePath + + " as it already exists."); + } + } + } + return getNodePath(rootNodePath, nodeName.substring(splitIdx)); + } + + /** + * Returns leaf app node path based on app id and configured split index. If + * the passed flag createParentIfNotExists is true, also creates the parent + * app node if it does not exist. + * @param appId application id. + * @param createParentIfNotExists flag which determines if parent app node + * needs to be created(as per split) if it does not exist. + * @return leaf app node path. + * @throws Exception if any problem occurs while performing ZK operation. + */ + private String getLeafAppIdNodePath(String appId, + boolean createParentIfNotExists) throws Exception { + return getLeafAppIdNodePath(appId, rmAppRootHierarchies.get( + appIdNodeSplitIndex), appIdNodeSplitIndex, createParentIfNotExists); + } + @VisibleForTesting byte[] getData(final String path) throws Exception { return curatorFramework.getData().forPath(path); @@ -931,11 +1228,13 @@ public class ZKRMStateStore extends RMStateStore { return curatorFramework.getACL().forPath(path); } - private List getChildren(final String path) throws Exception { + @VisibleForTesting + List getChildren(final String path) throws Exception { return curatorFramework.getChildren().forPath(path); } - private boolean exists(final String path) throws Exception { + @VisibleForTesting + boolean exists(final String path) throws Exception { return curatorFramework.checkExists().forPath(path) != null; } @@ -964,6 +1263,11 @@ public class ZKRMStateStore extends RMStateStore { } } + /** + * Deletes the path. Checks for existence of path as well. + * @param path Path to be deleted. + * @throws Exception if any problem occurs while performing deletion. + */ private void safeDelete(final String path) throws Exception { if (exists(path)) { SafeTransaction transaction = new SafeTransaction(); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMStoreCommands.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMStoreCommands.java index 6c74616379e..02033518082 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMStoreCommands.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMStoreCommands.java @@ -87,8 +87,13 @@ public class TestRMStoreCommands { ZKRMStateStore.ROOT_ZNODE_NAME + "/" + RMStateStore.RM_APP_ROOT; String appIdPath = appRootPath + "/" + appId; curatorFramework.create().forPath(appIdPath); - assertEquals("Application node for " + appId + "should exist", - appId, curatorFramework.getChildren().forPath(appRootPath).get(0)); + for (String path : curatorFramework.getChildren().forPath(appRootPath)) { + if (path.equals(ZKRMStateStore.RM_APP_ROOT_HIERARCHIES)) { + continue; + } + assertEquals("Application node for " + appId + " should exist", + appId, path); + } try { ResourceManager.removeApplication(conf, appId); } catch (Exception e) { @@ -96,8 +101,10 @@ public class TestRMStoreCommands { "rm state store."); } assertTrue("After remove app from store there should be no child nodes" + - " in app root path", - curatorFramework.getChildren().forPath(appRootPath).isEmpty()); + " for application in app root path", + curatorFramework.getChildren().forPath(appRootPath).size() == 1 && + curatorFramework.getChildren().forPath(appRootPath).get(0).equals( + ZKRMStateStore.RM_APP_ROOT_HIERARCHIES)); } } } \ No newline at end of file diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStoreTestBase.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStoreTestBase.java index 8544c13d7e5..963aac84de5 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStoreTestBase.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStoreTestBase.java @@ -156,8 +156,7 @@ public class RMStateStoreTestBase { } protected RMApp storeApp(RMStateStore store, ApplicationId appId, - long submitTime, - long startTime) throws Exception { + long submitTime, long startTime) throws Exception { ApplicationSubmissionContext context = new ApplicationSubmissionContextPBImpl(); context.setApplicationId(appId); @@ -200,6 +199,13 @@ public class RMStateStoreTestBase { return mockAttempt; } + protected void updateAttempt(RMStateStore store, TestDispatcher dispatcher, + ApplicationAttemptStateData attemptState) { + dispatcher.attemptId = attemptState.getAttemptId(); + store.updateApplicationAttemptState(attemptState); + waitNotify(dispatcher); + } + void testRMAppStateStore(RMStateStoreHelper stateStoreHelper) throws Exception { testRMAppStateStore(stateStoreHelper, new StoreStateVerifier()); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/TestZKRMStateStore.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/TestZKRMStateStore.java index f71cf25568d..7fb9696bb8e 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/TestZKRMStateStore.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/TestZKRMStateStore.java @@ -28,6 +28,8 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.ha.HAServiceProtocol; import org.apache.hadoop.ha.HAServiceProtocol.StateChangeRequestInfo; import org.apache.hadoop.io.Text; +import org.apache.hadoop.security.Credentials; +import org.apache.hadoop.security.token.Token; import org.apache.hadoop.security.token.delegation.DelegationKey; import org.apache.hadoop.service.Service; import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; @@ -40,19 +42,25 @@ import org.apache.hadoop.yarn.api.records.impl.pb.ApplicationSubmissionContextPB import org.apache.hadoop.yarn.api.records.impl.pb.ContainerPBImpl; import org.apache.hadoop.yarn.conf.HAUtil; import org.apache.hadoop.yarn.conf.YarnConfiguration; +import org.apache.hadoop.yarn.security.AMRMTokenIdentifier; import org.apache.hadoop.yarn.security.client.RMDelegationTokenIdentifier; import org.apache.hadoop.yarn.server.records.Version; import org.apache.hadoop.yarn.server.records.impl.pb.VersionPBImpl; import org.apache.hadoop.yarn.server.resourcemanager.MockRM; +import org.apache.hadoop.yarn.server.resourcemanager.RMContext; import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager; +import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore.RMState; import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.ApplicationAttemptStateData; import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.ApplicationStateData; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp; +import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppState; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.AggregateAppResourceUsage; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptMetrics; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptState; +import org.apache.hadoop.yarn.server.resourcemanager.security.AMRMTokenSecretManager; import org.apache.hadoop.yarn.server.resourcemanager.security.ClientToAMTokenSecretManagerInRM; +import org.apache.hadoop.yarn.server.security.MasterKeyData; import org.apache.zookeeper.KeeperException; import org.apache.zookeeper.ZooDefs.Perms; import org.apache.zookeeper.data.ACL; @@ -61,13 +69,22 @@ import org.junit.Assert; import org.junit.Before; import org.junit.Test; +import com.google.common.base.Joiner; +import com.google.common.collect.ImmutableMap; +import com.google.common.collect.Lists; + +import static org.junit.Assert.assertArrayEquals; import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertTrue; import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.spy; import static org.mockito.Mockito.when; import java.io.IOException; +import java.util.HashMap; import java.util.List; +import java.util.Map; import javax.crypto.SecretKey; @@ -130,9 +147,21 @@ public class TestZKRMStateStore extends RMStateStoreTestBase { return CURRENT_VERSION_INFO; } + private String getAppNode(String appId, int splitIdx) { + String rootPath = workingZnode + "/" + ROOT_ZNODE_NAME + "/" + + RM_APP_ROOT; + String appPath = appId; + if (splitIdx != 0) { + int idx = appId.length() - splitIdx; + appPath = appId.substring(0, idx) + "/" + appId.substring(idx); + return rootPath + "/" + RM_APP_ROOT_HIERARCHIES + "/" + + Integer.toString(splitIdx) + "/" + appPath; + } + return rootPath + "/" + appPath; + } + public String getAppNode(String appId) { - return workingZnode + "/" + ROOT_ZNODE_NAME + "/" + RM_APP_ROOT + "/" - + appId; + return getAppNode(appId, 0); } public String getAttemptNode(String appId, String attemptId) { @@ -149,8 +178,7 @@ public class TestZKRMStateStore extends RMStateStoreTestBase { } - public RMStateStore getRMStateStore() throws Exception { - YarnConfiguration conf = new YarnConfiguration(); + private RMStateStore createStore(Configuration conf) throws Exception { workingZnode = "/jira/issue/3077/rmstore"; conf.set(YarnConfiguration.RM_ZK_ADDRESS, curatorTestingServer.getConnectString()); @@ -159,6 +187,15 @@ public class TestZKRMStateStore extends RMStateStoreTestBase { return this.store; } + public RMStateStore getRMStateStore(Configuration conf) throws Exception { + return createStore(conf); + } + + public RMStateStore getRMStateStore() throws Exception { + YarnConfiguration conf = new YarnConfiguration(); + return createStore(conf); + } + @Override public boolean isFinalStateValid() throws Exception { return 1 == @@ -178,8 +215,12 @@ public class TestZKRMStateStore extends RMStateStoreTestBase { } public boolean appExists(RMApp app) throws Exception { + String appIdPath = app.getApplicationId().toString(); + int split = + store.getConfig().getInt(YarnConfiguration.ZK_APPID_NODE_SPLIT_INDEX, + YarnConfiguration.DEFAULT_ZK_APPID_NODE_SPLIT_INDEX); return null != curatorFramework.checkExists() - .forPath(store.getAppNode(app.getApplicationId().toString())); + .forPath(store.getAppNode(appIdPath, split)); } public boolean attemptExists(RMAppAttempt attempt) throws Exception { @@ -342,7 +383,6 @@ public class TestZKRMStateStore extends RMStateStoreTestBase { rm.close(); } - @SuppressWarnings("unchecked") @Test public void testFencing() throws Exception { StateChangeRequestInfo req = new StateChangeRequestInfo( @@ -382,13 +422,15 @@ public class TestZKRMStateStore extends RMStateStoreTestBase { assertEquals("RM should be Active", HAServiceProtocol.HAServiceState.ACTIVE, rm2.getRMContext().getRMAdminService().getServiceStatus().getState()); + rm1.close(); + rm2.close(); } @Test public void testFencedState() throws Exception { TestZKRMStateStoreTester zkTester = new TestZKRMStateStoreTester(); - RMStateStore store = zkTester.getRMStateStore(); - + RMStateStore store = zkTester.getRMStateStore(); + // Move state to FENCED from ACTIVE store.updateFencedState(); assertEquals("RMStateStore should have been in fenced state", @@ -527,4 +569,518 @@ public class TestZKRMStateStore extends RMStateStoreTestBase { } store.close(); } + + private static String createPath(String... parts) { + return Joiner.on("/").join(parts); + } + + private static Configuration createConfForAppNodeSplit(int splitIndex) { + YarnConfiguration conf = new YarnConfiguration(); + conf.setInt(YarnConfiguration.ZK_APPID_NODE_SPLIT_INDEX, splitIndex); + return conf; + } + + private static RMApp createMockAppForRemove(ApplicationId appId, + ApplicationAttemptId... attemptIds) { + RMApp app = mock(RMApp.class); + ApplicationSubmissionContextPBImpl context = + new ApplicationSubmissionContextPBImpl(); + context.setApplicationId(appId); + when(app.getApplicationSubmissionContext()).thenReturn(context); + when(app.getUser()).thenReturn("test"); + if (attemptIds.length > 0) { + HashMap attempts = new HashMap<>(); + for (ApplicationAttemptId attemptId : attemptIds) { + RMAppAttempt appAttempt = mock(RMAppAttempt.class); + when(appAttempt.getAppAttemptId()).thenReturn(attemptId); + attempts.put(attemptId, appAttempt); + } + when(app.getAppAttempts()).thenReturn(attempts); + } + return app; + } + + private static void verifyLoadedApp(ApplicationStateData appState, + ApplicationId appId, String user, long submitTime, long startTime, + RMAppState state, long finishTime, String diagnostics) { + // Check if app is loaded correctly + assertNotNull("App " + appId + " should have been loaded.", appState); + assertEquals("App submit time in app state", submitTime, + appState.getSubmitTime()); + assertEquals("App start time in app state", startTime, + appState.getStartTime()); + assertEquals("App ID in app state", appId, + appState.getApplicationSubmissionContext().getApplicationId()); + assertEquals("App state", state, appState.getState()); + assertEquals("Finish time in app state", finishTime, + appState.getFinishTime()); + assertEquals("User in app state", user, appState.getUser()); + assertEquals("Diagnostics in app state", diagnostics, + appState.getDiagnostics()); + } + + private static void verifyLoadedApp(RMState rmState, + ApplicationId appId, long submitTime, long startTime, long finishTime, + boolean isFinished, List attempts) { + verifyLoadedApp(rmState, appId, submitTime, startTime, finishTime, + isFinished, attempts, null, null); + } + + private static void verifyLoadedApp(RMState rmState, + ApplicationId appId, long submitTime, long startTime, long finishTime, + boolean isFinished, List attempts, + List amExitStatuses, + List finalStatuses) { + Map rmAppState = + rmState.getApplicationState(); + ApplicationStateData appState = rmAppState.get(appId); + assertNotNull(appId + " is not there in loaded apps", appState); + verifyLoadedApp(appState, appId, "test", submitTime, startTime, + isFinished ? RMAppState.FINISHED : null, finishTime, + isFinished ? "appDiagnostics" : ""); + // Check attempt state. + if (attempts != null) { + assertEquals("Attempts loaded for app " + appId, attempts.size(), + appState.attempts.size()); + if (finalStatuses != null && amExitStatuses != null) { + for (int i = 0; i < attempts.size(); i++) { + if (finalStatuses.get(i) != null) { + verifyLoadedAttempt(appState, attempts.get(i), + amExitStatuses.get(i), true); + } else { + verifyLoadedAttempt(appState, attempts.get(i), + amExitStatuses.get(i), false); + } + } + } + } else { + assertEquals( + "Attempts loaded for app " + appId, 0, appState.attempts.size()); + } + } + + private static void verifyLoadedAttempt(ApplicationStateData appState, + ApplicationAttemptId attemptId, int amExitStatus, boolean isFinished) { + verifyLoadedAttempt(appState, attemptId, isFinished ? "myTrackingUrl" : + "N/A", ContainerId.newContainerId(attemptId, 1), null, + isFinished ? RMAppAttemptState.FINISHED : null, isFinished ? + "attemptDiagnostics" : "", 0, amExitStatus, + isFinished ? FinalApplicationStatus.SUCCEEDED : null); + } + + private static void verifyLoadedAttempt(ApplicationStateData appState, + ApplicationAttemptId attemptId, String trackingURL, + ContainerId masterContainerId, SecretKey clientTokenKey, + RMAppAttemptState state, String diagnostics, long finishTime, + int amExitStatus, FinalApplicationStatus finalStatus) { + ApplicationAttemptStateData attemptState = appState.getAttempt(attemptId); + // Check if attempt is loaded correctly + assertNotNull( + "Attempt " + attemptId + " should have been loaded.", attemptState); + assertEquals("Attempt Id in attempt state", + attemptId, attemptState.getAttemptId()); + assertEquals("Master Container Id in attempt state", + masterContainerId, attemptState.getMasterContainer().getId()); + if (null != clientTokenKey) { + assertArrayEquals("Client token key in attempt state", + clientTokenKey.getEncoded(), attemptState.getAppAttemptTokens(). + getSecretKey(RMStateStore.AM_CLIENT_TOKEN_MASTER_KEY_NAME)); + } + assertEquals("Attempt state", state, attemptState.getState()); + assertEquals("Finish time in attempt state", finishTime, + attemptState.getFinishTime()); + assertEquals("Diagnostics in attempt state", diagnostics, + attemptState.getDiagnostics()); + assertEquals("AM Container exit status in attempt state", amExitStatus, + attemptState.getAMContainerExitStatus()); + assertEquals("Final app status in attempt state", finalStatus, + attemptState.getFinalApplicationStatus()); + assertEquals("Tracking URL in attempt state", trackingURL, + attemptState.getFinalTrackingUrl()); + } + + private static ApplicationStateData createAppState( + ApplicationSubmissionContext ctxt, long submitTime, long startTime, + long finishTime, boolean isFinished) { + return ApplicationStateData.newInstance(submitTime, startTime, "test", + ctxt, isFinished ? RMAppState.FINISHED : null, isFinished ? + "appDiagnostics" : "", isFinished ? finishTime : 0, null); + } + + private static ApplicationAttemptStateData createFinishedAttempt( + ApplicationAttemptId attemptId, Container container, long startTime, + int amExitStatus) { + return ApplicationAttemptStateData.newInstance(attemptId, + container, null, startTime, RMAppAttemptState.FINISHED, + "myTrackingUrl", "attemptDiagnostics", FinalApplicationStatus.SUCCEEDED, + amExitStatus, 0, 0, 0, 0, 0); + } + + private ApplicationAttemptId storeAttempt(RMStateStore store, + TestDispatcher dispatcher, String appAttemptIdStr, + AMRMTokenSecretManager appTokenMgr, + ClientToAMTokenSecretManagerInRM clientToAMTokenMgr, + boolean createContainer) throws Exception { + ApplicationAttemptId attemptId = + ApplicationAttemptId.fromString(appAttemptIdStr); + Token appAttemptToken = null; + if (appTokenMgr != null) { + appAttemptToken = generateAMRMToken(attemptId, appTokenMgr); + } + SecretKey clientTokenKey = null; + if (clientToAMTokenMgr != null) { + clientTokenKey = clientToAMTokenMgr.createMasterKey(attemptId); + Credentials attemptCred = new Credentials(); + attemptCred.addSecretKey(RMStateStore.AM_CLIENT_TOKEN_MASTER_KEY_NAME, + clientTokenKey.getEncoded()); + } + ContainerId containerId = null; + if (createContainer) { + containerId = ContainerId.newContainerId(attemptId, 1); + } + storeAttempt(store, attemptId, containerId.toString(), appAttemptToken, + clientTokenKey, dispatcher); + return attemptId; + } + + private void finishAppWithAttempts(RMState state, RMStateStore store, + TestDispatcher dispatcher, ApplicationAttemptId attemptId, + long submitTime, long startTime, int amExitStatus, long finishTime, + boolean createNewApp) throws Exception { + ApplicationId appId = attemptId.getApplicationId(); + ApplicationStateData appStateNew = null; + if (createNewApp) { + ApplicationSubmissionContext context = + new ApplicationSubmissionContextPBImpl(); + context.setApplicationId(appId); + appStateNew = createAppState(context, submitTime, startTime, finishTime, + true); + } else { + ApplicationStateData appState = state.getApplicationState().get(appId); + appStateNew = createAppState(appState.getApplicationSubmissionContext(), + submitTime, startTime, finishTime, true); + appStateNew.attempts.putAll(appState.attempts); + } + store.updateApplicationState(appStateNew); + waitNotify(dispatcher); + Container container = new ContainerPBImpl(); + container.setId(ContainerId.newContainerId(attemptId, 1)); + ApplicationAttemptStateData newAttemptState = + createFinishedAttempt(attemptId, container, startTime, amExitStatus); + updateAttempt(store, dispatcher, newAttemptState); + } + + private void storeAppWithAttempts(RMStateStore store, + TestDispatcher dispatcher, ApplicationAttemptId attemptId, + long submitTime, long startTime) throws Exception { + storeAppWithAttempts(store, dispatcher, submitTime, startTime, null, null, + attemptId); + } + + private void storeApp(RMStateStore store, TestDispatcher dispatcher, + ApplicationId appId, long submitTime, long startTime) throws Exception { + storeApp(store, appId, submitTime, startTime); + waitNotify(dispatcher); + } + + private void storeAppWithAttempts(RMStateStore store, + TestDispatcher dispatcher, long submitTime, long startTime, + AMRMTokenSecretManager appTokenMgr, + ClientToAMTokenSecretManagerInRM clientToAMTokenMgr, + ApplicationAttemptId attemptId, ApplicationAttemptId... attemptIds) + throws Exception { + ApplicationId appId = attemptId.getApplicationId(); + storeApp(store, dispatcher, appId, submitTime, startTime); + storeAttempt(store, dispatcher, attemptId.toString(), appTokenMgr, + clientToAMTokenMgr, true); + for (ApplicationAttemptId attempt : attemptIds) { + storeAttempt(store, dispatcher, attempt.toString(), appTokenMgr, + clientToAMTokenMgr, true); + } + } + + private static void removeApps(RMStateStore store, + Map appWithAttempts) { + for (Map.Entry entry : + appWithAttempts.entrySet()) { + RMApp mockApp = createMockAppForRemove(entry.getKey(), entry.getValue()); + store.removeApplication(mockApp); + } + } + + private static void verifyAppPathPath(RMStateStore store, ApplicationId appId, + int splitIndex) throws Exception { + String appIdStr = appId.toString(); + String appParent = appIdStr.substring(0, appIdStr.length() - splitIndex); + String appPath = appIdStr.substring(appIdStr.length() - splitIndex); + String path = createPath(((ZKRMStateStore)store).znodeWorkingPath, + ZKRMStateStore.ROOT_ZNODE_NAME, ZKRMStateStore.RM_APP_ROOT, + ZKRMStateStore.RM_APP_ROOT_HIERARCHIES, String.valueOf(splitIndex), + appParent, appPath); + assertTrue("Application with id " + appIdStr + " does not exist as per " + + "split in state store.", ((ZKRMStateStore)store).exists(path)); + } + + private static void verifyAppInHierarchicalPath(RMStateStore store, + String appId, int splitIdx) throws Exception { + String path = createPath(((ZKRMStateStore)store).znodeWorkingPath, + ZKRMStateStore.ROOT_ZNODE_NAME, ZKRMStateStore.RM_APP_ROOT); + if (splitIdx != 0) { + path = createPath(path, ZKRMStateStore.RM_APP_ROOT_HIERARCHIES, + String.valueOf(splitIdx), appId.substring(0, appId.length() - + splitIdx), appId.substring(appId.length() - splitIdx)); + } else { + path = createPath(path, appId); + } + assertTrue(appId + " should exist in path " + path, + ((ZKRMStateStore)store).exists(createPath(path))); + } + + private static void assertHierarchicalPaths(RMStateStore store, + Map pathToApps) throws Exception { + for (Map.Entry entry : pathToApps.entrySet()) { + String path = createPath(((ZKRMStateStore)store).znodeWorkingPath, + ZKRMStateStore.ROOT_ZNODE_NAME, ZKRMStateStore.RM_APP_ROOT); + if (entry.getKey() != 0) { + path = createPath(path, ZKRMStateStore.RM_APP_ROOT_HIERARCHIES, + String.valueOf(entry.getKey())); + } + assertEquals("Number of childrens for path " + path, + (int) entry.getValue(), + ((ZKRMStateStore)store).getChildren(path).size()); + } + } + + // Test to verify storing of apps and app attempts in ZK state store with app + // node split index configured more than 0. + @Test + public void testAppNodeSplit() throws Exception { + TestZKRMStateStoreTester zkTester = new TestZKRMStateStoreTester(); + long submitTime = System.currentTimeMillis(); + long startTime = submitTime + 1234; + Configuration conf = new YarnConfiguration(); + + // Get store with app node split config set as 1. + RMStateStore store = zkTester.getRMStateStore(createConfForAppNodeSplit(1)); + TestDispatcher dispatcher = new TestDispatcher(); + store.setRMDispatcher(dispatcher); + + // Create RM Context and app token manager. + RMContext rmContext = mock(RMContext.class); + when(rmContext.getStateStore()).thenReturn(store); + AMRMTokenSecretManager appTokenMgr = + spy(new AMRMTokenSecretManager(conf, rmContext)); + MasterKeyData masterKeyData = appTokenMgr.createNewMasterKey(); + when(appTokenMgr.getMasterKey()).thenReturn(masterKeyData); + ClientToAMTokenSecretManagerInRM clientToAMTokenMgr = + new ClientToAMTokenSecretManagerInRM(); + + // Store app1. + ApplicationId appId1 = ApplicationId.newInstance(1352994193343L, 1); + ApplicationAttemptId attemptId1 = + ApplicationAttemptId.newInstance(appId1, 1); + ApplicationAttemptId attemptId2 = + ApplicationAttemptId.newInstance(appId1, 2); + storeAppWithAttempts(store, dispatcher, submitTime, startTime, + appTokenMgr, clientToAMTokenMgr, attemptId1, attemptId2); + + // Store app2 with app id application_1352994193343_120213. + ApplicationId appId21 = ApplicationId.newInstance(1352994193343L, 120213); + storeApp(store, appId21, submitTime, startTime); + waitNotify(dispatcher); + + // Store another app which will be removed. + ApplicationId appIdRemoved = ApplicationId.newInstance(1352994193343L, 2); + ApplicationAttemptId attemptIdRemoved = + ApplicationAttemptId.newInstance(appIdRemoved, 1); + storeAppWithAttempts(store, dispatcher, submitTime, startTime, + null, null, attemptIdRemoved); + // Remove the app. + RMApp mockRemovedApp = + createMockAppForRemove(appIdRemoved, attemptIdRemoved); + store.removeApplication(mockRemovedApp); + // Close state store + store.close(); + + // Load state store + store = zkTester.getRMStateStore(createConfForAppNodeSplit(1)); + store.setRMDispatcher(dispatcher); + RMState state = store.loadState(); + // Check if application_1352994193343_120213 (i.e. app2) exists in state + // store as per split index. + verifyAppPathPath(store, appId21, 1); + + // Verify loaded apps and attempts based on the operations we did before + // reloading the state store. + verifyLoadedApp(state, appId1, submitTime, startTime, 0, false, + Lists.newArrayList(attemptId1, attemptId2), Lists.newArrayList(-1000, + -1000), Lists.newArrayList((FinalApplicationStatus) null, null)); + + // Update app state for app1. + finishAppWithAttempts(state, store, dispatcher, attemptId2, submitTime, + startTime, 100, 1234, false); + + // Test updating app/attempt for app whose initial state is not saved + ApplicationId dummyAppId = ApplicationId.newInstance(1234, 10); + ApplicationAttemptId dummyAttemptId = + ApplicationAttemptId.newInstance(dummyAppId, 6); + finishAppWithAttempts(state, store, dispatcher, dummyAttemptId, submitTime, + startTime, 111, 1234, true); + // Close the store + store.close(); + + // Check updated application state. + store = zkTester.getRMStateStore(createConfForAppNodeSplit(1)); + store.setRMDispatcher(dispatcher); + RMState newRMState = store.loadState(); + verifyLoadedApp(newRMState, dummyAppId, submitTime, startTime, 1234, true, + Lists.newArrayList(dummyAttemptId), Lists.newArrayList(111), + Lists.newArrayList(FinalApplicationStatus.SUCCEEDED)); + verifyLoadedApp(newRMState, appId1, submitTime, startTime, 1234, true, + Lists.newArrayList(attemptId1, attemptId2), + Lists.newArrayList(-1000, 100), Lists.newArrayList(null, + FinalApplicationStatus.SUCCEEDED)); + + // assert store is in expected state after everything is cleaned + assertTrue("Store is not in expected state", zkTester.isFinalStateValid()); + store.close(); + } + + // Test to verify storing of apps and app attempts in ZK state store with app + // node split index config changing across restarts. + @Test + public void testAppNodeSplitChangeAcrossRestarts() throws Exception { + TestZKRMStateStoreTester zkTester = new TestZKRMStateStoreTester(); + long submitTime = System.currentTimeMillis(); + long startTime = submitTime + 1234; + Configuration conf = new YarnConfiguration(); + + // Create store with app node split set as 1. + RMStateStore store = zkTester.getRMStateStore(createConfForAppNodeSplit(1)); + TestDispatcher dispatcher = new TestDispatcher(); + store.setRMDispatcher(dispatcher); + RMContext rmContext = mock(RMContext.class); + when(rmContext.getStateStore()).thenReturn(store); + AMRMTokenSecretManager appTokenMgr = + spy(new AMRMTokenSecretManager(conf, rmContext)); + MasterKeyData masterKeyData = appTokenMgr.createNewMasterKey(); + when(appTokenMgr.getMasterKey()).thenReturn(masterKeyData); + ClientToAMTokenSecretManagerInRM clientToAMTokenMgr = + new ClientToAMTokenSecretManagerInRM(); + + // Store app1 with 2 attempts. + ApplicationId appId1 = ApplicationId.newInstance(1442994194053L, 1); + ApplicationAttemptId attemptId1 = + ApplicationAttemptId.newInstance(appId1, 1); + ApplicationAttemptId attemptId2 = + ApplicationAttemptId.newInstance(appId1, 2); + storeAppWithAttempts(store, dispatcher, submitTime, startTime, + appTokenMgr, clientToAMTokenMgr, attemptId1, attemptId2); + + // Store app2 and associated attempt. + ApplicationId appId11 = ApplicationId.newInstance(1442994194053L, 2); + ApplicationAttemptId attemptId11 = + ApplicationAttemptId.newInstance(appId11, 1); + storeAppWithAttempts(store, dispatcher, attemptId11, submitTime, startTime); + // Close state store + store.close(); + + // Load state store with app node split config of 2. + store = zkTester.getRMStateStore(createConfForAppNodeSplit(2)); + store.setRMDispatcher(dispatcher); + RMState state = store.loadState(); + ApplicationId appId21 = ApplicationId.newInstance(1442994194053L, 120213); + storeApp(store, dispatcher, appId21, submitTime, startTime); + + // Check if app is loaded correctly despite change in split index. + verifyLoadedApp(state, appId1, submitTime, startTime, 0, false, + Lists.newArrayList(attemptId1, attemptId2), Lists.newArrayList(-1000, + -1000), Lists.newArrayList((FinalApplicationStatus) null, null)); + + // Finish app/attempt state + finishAppWithAttempts(state, store, dispatcher, attemptId2, submitTime, + startTime, 100, 1234, false); + + // Test updating app/attempt for app whose initial state is not saved + ApplicationId dummyAppId = ApplicationId.newInstance(1234, 10); + ApplicationAttemptId dummyAttemptId = + ApplicationAttemptId.newInstance(dummyAppId, 6); + finishAppWithAttempts(state, store, dispatcher, dummyAttemptId, submitTime, + startTime, 111, 1234, true); + // Close the store + store.close(); + + // Load state store this time with split index of 0. + store = zkTester.getRMStateStore(createConfForAppNodeSplit(0)); + store.setRMDispatcher(dispatcher); + state = store.loadState(); + assertEquals("Number of Apps loaded should be 4.", 4, + state.getApplicationState().size()); + verifyLoadedApp(state, appId1, submitTime, startTime, 1234, true, + Lists.newArrayList(attemptId1, attemptId2), Lists.newArrayList(-1000, + 100), Lists.newArrayList(null, FinalApplicationStatus.SUCCEEDED)); + // Remove attempt1 + store.removeApplicationAttempt(attemptId1); + ApplicationId appId31 = ApplicationId.newInstance(1442994195071L, 45); + storeApp(store, dispatcher, appId31, submitTime, startTime); + // Close state store. + store.close(); + + // Load state store with split index of 3. + store = zkTester.getRMStateStore(createConfForAppNodeSplit(3)); + store.setRMDispatcher(dispatcher); + state = store.loadState(); + assertEquals("Number of apps loaded should be 5.", 5, + state.getApplicationState().size()); + verifyLoadedApp(state, dummyAppId, submitTime, startTime, 1234, true, + Lists.newArrayList(dummyAttemptId), Lists.newArrayList(111), + Lists.newArrayList(FinalApplicationStatus.SUCCEEDED)); + verifyLoadedApp(state, appId31, submitTime, startTime, 0, false, null); + verifyLoadedApp(state, appId21, submitTime, startTime, 0, false, null); + verifyLoadedApp(state, appId11, submitTime, startTime, 0, false, + Lists.newArrayList(attemptId11), Lists.newArrayList(-1000), + Lists.newArrayList((FinalApplicationStatus) null)); + verifyLoadedApp(state, appId1, submitTime, startTime, 1234, true, + Lists.newArrayList(attemptId2), Lists.newArrayList(100), + Lists.newArrayList(FinalApplicationStatus.SUCCEEDED)); + + // Store another app. + ApplicationId appId41 = ApplicationId.newInstance(1442994195087L, 1); + storeApp(store, dispatcher, appId41, submitTime, startTime); + // Check how many apps exist in each of the hierarchy based paths. 0 paths + // should exist in "HIERARCHIES/4" path as app split index was never set + // as 4 in tests above. + assertHierarchicalPaths(store, ImmutableMap.of(0, 2, 1, 1, 2, 2, + 3, 1, 4, 0)); + verifyAppInHierarchicalPath(store, "application_1442994195087_0001", 3); + + ApplicationId appId71 = ApplicationId.newInstance(1442994195087L, 7); + //storeApp(store, dispatcher, appId71, submitTime, startTime); + storeApp(store, appId71, submitTime, startTime); + waitNotify(dispatcher); + ApplicationAttemptId attemptId71 = + ApplicationAttemptId.newInstance(appId71, 1); + storeAttempt(store, ApplicationAttemptId.newInstance(appId71, 1), + ContainerId.newContainerId(attemptId71, 1).toString(), null, null, + dispatcher); + // Remove applications. + removeApps(store, ImmutableMap.of(appId11, new ApplicationAttemptId[] + {attemptId11}, appId71, new ApplicationAttemptId[] {attemptId71}, + appId41, new ApplicationAttemptId[0], appId31, + new ApplicationAttemptId[0], appId21, new ApplicationAttemptId[0])); + removeApps(store, ImmutableMap.of(dummyAppId, + new ApplicationAttemptId[] {dummyAttemptId}, appId1, + new ApplicationAttemptId[] {attemptId1, attemptId2})); + store.close(); + + // Load state store with split index of 3 again. As all apps have been + // removed nothing should be loaded back. + store = zkTester.getRMStateStore(createConfForAppNodeSplit(3)); + store.setRMDispatcher(dispatcher); + state = store.loadState(); + assertEquals("Number of apps loaded should be 0.", 0, + state.getApplicationState().size()); + // Close the state store. + store.close(); + } }