YARN-2962. ZKRMStateStore: Limit the number of znodes under a znode (Contributed by Varun Sexena via Daniel Templeton)
(cherry picked from commit 2e52789edf68016e7a3f450164f8bd3d8e6cb210)
This commit is contained in:
@ -532,6 +532,10 @@ public static boolean isAclEnabled(Configuration conf) {
public static final String RM_ZK_TIMEOUT_MS = RM_ZK_PREFIX + "timeout-ms";
public static final int DEFAULT_RM_ZK_TIMEOUT_MS = 10000;
public static final String ZK_APPID_NODE_SPLIT_INDEX =
RM_ZK_PREFIX + "appid-node.split-index";
public static final int DEFAULT_ZK_APPID_NODE_SPLIT_INDEX = 0;
public static final String RM_ZK_ACL = RM_ZK_PREFIX + "acl";
public static final String DEFAULT_RM_ZK_ACL = "world:anyone:rwcda";
@ -605,7 +605,27 @@
<description>Name of the cluster. In an HA setting,
<description>Index at which last section of application id (with each section
separated by _ in application id) will be split so that application znode
stored in zookeeper RM state store will be stored as two different znodes
(parent-child). Split is done from the end.
For instance, with no split, appid znode will be of the form
application_1352994193343_0001. If the value of this config is 1, the
appid znode will be broken into two parts application_1352994193343_000
and 1 respectively with former being the parent node.
application_1352994193343_0002 will then be stored as 2 under the parent
node application_1352994193343_000. This config can take values from 0 to 4.
0 means there will be no split. If configuration value is outside this
range, it will be treated as config value of 0(i.e. no split). A value
larger than 0 (up to 4) should be configured if you are storing a large number
of apps in ZK based RM state store and state store operations are failing due to
LenError in Zookeeper.</description>
<description>Name of the cluster. In a HA setting,
this is used to ensure the RM participates in leader
election for this cluster and ensures it does not affect
other clusters</description>
@ -57,6 +57,7 @@
import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.impl.pb.ApplicationStateDataPBImpl;
import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.impl.pb.EpochPBImpl;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.ZooDefs;
import org.apache.zookeeper.data.ACL;
import org.apache.zookeeper.data.Id;
@ -72,6 +73,8 @@
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
* {@link RMStateStore} implementation backed by ZooKeeper.
@ -82,6 +85,31 @@
* |--- RM_APP_ROOT
* | |----- HIERARCHIES
* | | |----- 1
* | | | |----- (#ApplicationId barring last character)
* | | | | |----- (#Last character of ApplicationId)
* | | | | | |----- (#ApplicationAttemptIds)
* | | | ....
* | | |
* | | |----- 2
* | | | |----- (#ApplicationId barring last 2 characters)
* | | | | |----- (#Last 2 characters of ApplicationId)
* | | | | | |----- (#ApplicationAttemptIds)
* | | | ....
* | | |
* | | |----- 3
* | | | |----- (#ApplicationId barring last 3 characters)
* | | | | |----- (#Last 3 characters of ApplicationId)
* | | | | | |----- (#ApplicationAttemptIds)
* | | | ....
* | | |
* | | |----- 4
* | | | |----- (#ApplicationId barring last 4 characters)
* | | | | |----- (#Last 4 characters of ApplicationId)
* | | | | | |----- (#ApplicationAttemptIds)
* | | | ....
* | | |
* | |----- (#ApplicationId1)
* | | |----- (#ApplicationAttemptIds)
* | |
@ -121,6 +149,7 @@
public class ZKRMStateStore extends RMStateStore {
private static final Log LOG = LogFactory.getLog(ZKRMStateStore.class);
private static final String RM_DELEGATION_TOKENS_ROOT_ZNODE_NAME =
private static final String RM_DT_SEQUENTIAL_NUMBER_ZNODE_NAME =
@ -129,12 +158,15 @@ public class ZKRMStateStore extends RMStateStore {
public static final String ROOT_ZNODE_NAME = "ZKRMStateRoot";
protected static final Version CURRENT_VERSION_INFO =
Version.newInstance(1, 3);
protected static final Version CURRENT_VERSION_INFO = Version
.newInstance(2, 0);
public static final String RM_APP_ROOT_HIERARCHIES = "HIERARCHIES";
/* Znode paths */
private String zkRootNodePath;
private String rmAppRoot;
private Map<Integer, String> rmAppRootHierarchies;
private String rmDTSecretManagerRoot;
private String dtMasterKeysRootPath;
private String delegationTokensRootPath;
@ -144,6 +176,7 @@ public class ZKRMStateStore extends RMStateStore {
protected String znodeWorkingPath;
private int appIdNodeSplitIndex = 0;
/* Fencing related variables */
private static final String FENCING_LOCK = "RM_ZK_FENCING_LOCK";
@ -165,6 +198,27 @@ public class ZKRMStateStore extends RMStateStore {
protected CuratorFramework curatorFramework;
* Indicates different app attempt state store operations.
private enum AppAttemptOp {
* Encapsulates full app node path and corresponding split index.
private final static class AppNodeSplitInfo {
private final String path;
private final int splitIndex;
AppNodeSplitInfo(String path, int splitIndex) {
this.path = path;
this.splitIndex = splitIndex;
* Given the {@link Configuration} and {@link ACL}s used (sourceACLs) for
* ZooKeeper access, construct the {@link ACL}s for the store's root node.
@ -212,11 +266,30 @@ public synchronized void initInternal(Configuration conf)
zkRootNodePath = getNodePath(znodeWorkingPath, ROOT_ZNODE_NAME);
fencingNodePath = getNodePath(zkRootNodePath, FENCING_LOCK);
rmAppRoot = getNodePath(zkRootNodePath, RM_APP_ROOT);
String hierarchiesPath = getNodePath(rmAppRoot, RM_APP_ROOT_HIERARCHIES);
rmAppRootHierarchies = new HashMap<>(5);
rmAppRootHierarchies.put(0, rmAppRoot);
for (int splitIndex = 1; splitIndex <= 4; splitIndex++) {
getNodePath(hierarchiesPath, Integer.toString(splitIndex)));
fencingNodePath = getNodePath(zkRootNodePath, FENCING_LOCK);
zkSessionTimeout = conf.getInt(YarnConfiguration.RM_ZK_TIMEOUT_MS,
appIdNodeSplitIndex =
if (appIdNodeSplitIndex < 1 || appIdNodeSplitIndex > 4) {
LOG.info("Invalid value " + appIdNodeSplitIndex + " for config " +
YarnConfiguration.ZK_APPID_NODE_SPLIT_INDEX + " specified. " +
"Resetting it to " +
appIdNodeSplitIndex = YarnConfiguration.DEFAULT_ZK_APPID_NODE_SPLIT_INDEX;
zkAcl = RMZKUtils.getZKAcls(conf);
if (HAUtil.isHAEnabled(conf)) {
@ -269,6 +342,10 @@ public synchronized void startInternal() throws Exception {
create(getNodePath(rmAppRoot, RM_APP_ROOT_HIERARCHIES));
for (int splitIndex = 1; splitIndex <= 4; splitIndex++) {
@ -525,42 +602,64 @@ private void loadRMDelegationTokenState(RMState rmState) throws Exception {
private void loadRMAppStateFromAppNode(RMState rmState, String appNodePath,
String appIdStr) throws Exception {
byte[] appData = getData(appNodePath);
if (LOG.isDebugEnabled()) {
LOG.debug("Loading application from znode: " + appNodePath);
ApplicationId appId = ApplicationId.fromString(appIdStr);
ApplicationStateDataPBImpl appState = new ApplicationStateDataPBImpl(
if (!appId.equals(
appState.getApplicationSubmissionContext().getApplicationId())) {
throw new YarnRuntimeException("The node name is different from the " +
"application id");
rmState.appState.put(appId, appState);
loadApplicationAttemptState(appState, appNodePath);
private synchronized void loadRMAppState(RMState rmState) throws Exception {
List<String> childNodes = getChildren(rmAppRoot);
for (String childNodeName : childNodes) {
String childNodePath = getNodePath(rmAppRoot, childNodeName);
byte[] childData = getData(childNodePath);
if (childNodeName.startsWith(ApplicationId.appIdStrPrefix)) {
// application
if (LOG.isDebugEnabled()) {
LOG.debug("Loading application from znode: " + childNodeName);
for (int splitIndex = 0; splitIndex <= 4; splitIndex++) {
String appRoot = rmAppRootHierarchies.get(splitIndex);
if (appRoot == null) {
List<String> childNodes = getChildren(appRoot);
boolean appNodeFound = false;
for (String childNodeName : childNodes) {
if (childNodeName.startsWith(ApplicationId.appIdStrPrefix)) {
appNodeFound = true;
if (splitIndex == 0) {
getNodePath(appRoot, childNodeName), childNodeName);
} else {
// If AppId Node is partitioned.
String parentNodePath = getNodePath(appRoot, childNodeName);
List<String> leafNodes = getChildren(parentNodePath);
for (String leafNodeName : leafNodes) {
String appIdStr = childNodeName + leafNodeName;
getNodePath(parentNodePath, leafNodeName), appIdStr);
} else {
LOG.info("Unknown child node with name: " + childNodeName);
ApplicationId appId = ApplicationId.fromString(childNodeName);
ApplicationStateDataPBImpl appState =
new ApplicationStateDataPBImpl(
if (!appId.equals(
appState.getApplicationSubmissionContext().getApplicationId())) {
throw new YarnRuntimeException("The child node name is different "
+ "from the application id");
rmState.appState.put(appId, appState);
loadApplicationAttemptState(appState, appId);
} else {
LOG.info("Unknown child node with name: " + childNodeName);
if (splitIndex != appIdNodeSplitIndex && !appNodeFound) {
// If no loaded app exists for a particular split index and the split
// index for which apps are being loaded is not the one configured, then
// we do not need to keep track of this hierarchy for storing/updating/
// removing app/app attempt znodes.
private void loadApplicationAttemptState(ApplicationStateData appState,
ApplicationId appId)
throws Exception {
String appPath = getNodePath(rmAppRoot, appId.toString());
String appPath) throws Exception {
List<String> attempts = getChildren(appPath);
for (String attemptIDStr : attempts) {
@ -575,14 +674,68 @@ private void loadApplicationAttemptState(ApplicationStateData appState,
appState.attempts.put(attemptState.getAttemptId(), attemptState);
LOG.debug("Done loading applications from ZK state store");
* Get parent app node path based on full path and split index supplied.
* @param appIdPath App id path for which parent needs to be returned.
* @param splitIndex split index.
* @return parent app node path.
private String getSplitAppNodeParent(String appIdPath, int splitIndex) {
// Calculated as string upto index (appIdPath Length - split index - 1). We
// deduct 1 to exclude path separator.
return appIdPath.substring(0, appIdPath.length() - splitIndex - 1);
* Checks if parent app node has no leaf nodes and if it does not have,
* removes it. Called while removing application.
* @param appIdPath path of app id to be removed.
* @param splitIndex split index.
* @throws Exception if any problem occurs while performing ZK operation.
private void checkRemoveParentAppNode(String appIdPath, int splitIndex)
throws Exception {
if (splitIndex != 0) {
String parentAppNode = getSplitAppNodeParent(appIdPath, splitIndex);
List<String> children = null;
try {
children = getChildren(parentAppNode);
} catch (KeeperException.NoNodeException ke) {
// It should be fine to swallow this exception as the parent app node we
// intend to delete is already deleted.
if (LOG.isDebugEnabled()) {
LOG.debug("Unable to remove app parent node " + parentAppNode +
" as it does not exist.");
// No apps stored under parent path.
if (children != null && children.isEmpty()) {
try {
if (LOG.isDebugEnabled()) {
LOG.debug("No leaf app node exists. Removing parent node " +
} catch (KeeperException.NotEmptyException ke) {
// It should be fine to swallow this exception as the parent app node
// has to be deleted only if it has no children. And this node has.
if (LOG.isDebugEnabled()) {
LOG.debug("Unable to remove app parent node " + parentAppNode +
" as it has children.");
public synchronized void storeApplicationStateInternal(ApplicationId appId,
ApplicationStateData appStateDataPB) throws Exception {
String nodeCreatePath = getNodePath(rmAppRoot, appId.toString());
String nodeCreatePath = getLeafAppIdNodePath(appId.toString(), true);
if (LOG.isDebugEnabled()) {
LOG.debug("Storing info for app: " + appId + " at: " + nodeCreatePath);
@ -597,7 +750,26 @@ public synchronized void storeApplicationStateInternal(ApplicationId appId,
protected synchronized void updateApplicationStateInternal(
ApplicationId appId, ApplicationStateData appStateDataPB)
throws Exception {
String nodeUpdatePath = getNodePath(rmAppRoot, appId.toString());
String nodeUpdatePath = getLeafAppIdNodePath(appId.toString(), false);
boolean pathExists = true;
// Look for paths based on other split indices if path as per split index
// does not exist.
if (!exists(nodeUpdatePath)) {
AppNodeSplitInfo alternatePathInfo = getAlternatePath(appId.toString());
if (alternatePathInfo != null) {
nodeUpdatePath = alternatePathInfo.path;
} else {
// No alternate path exists. Create path as per configured split index.
pathExists = false;
if (appIdNodeSplitIndex != 0) {
String rootNode =
getSplitAppNodeParent(nodeUpdatePath, appIdNodeSplitIndex);
if (!exists(rootNode)) {
safeCreate(rootNode, null, zkAcl, CreateMode.PERSISTENT);
if (LOG.isDebugEnabled()) {
LOG.debug("Storing final state info for app: " + appId + " at: "
@ -606,34 +778,79 @@ protected synchronized void updateApplicationStateInternal(
byte[] appStateData = appStateDataPB.getProto().toByteArray();
if (exists(nodeUpdatePath)) {
if (pathExists) {
safeSetData(nodeUpdatePath, appStateData, -1);
} else {
safeCreate(nodeUpdatePath, appStateData, zkAcl,
safeCreate(nodeUpdatePath, appStateData, zkAcl, CreateMode.PERSISTENT);
if (LOG.isDebugEnabled()) {
LOG.debug(appId + " znode didn't exist. Created a new znode to"
+ " update the application state.");
LOG.debug("Path " + nodeUpdatePath + " for " + appId + " didn't " +
"exist. Creating a new znode to update the application state.");
* Handles store, update and remove application attempt state store
* operations.
private void handleApplicationAttemptStateOp(
ApplicationAttemptId appAttemptId,
ApplicationAttemptStateData attemptStateDataPB, AppAttemptOp operation)
throws Exception {
String appId = appAttemptId.getApplicationId().toString();
String appDirPath = getLeafAppIdNodePath(appId, false);
// Look for paths based on other split indices.
if (!exists(appDirPath)) {
AppNodeSplitInfo alternatePathInfo = getAlternatePath(appId);
if (alternatePathInfo == null) {
if (operation == AppAttemptOp.REMOVE) {
// Unexpected. Assume that app attempt has been deleted.
} else { // Store or Update operation
throw new YarnRuntimeException("Unexpected Exception. App node for " +
"app " + appId + " not found");
} else {
appDirPath = alternatePathInfo.path;
String path = getNodePath(appDirPath, appAttemptId.toString());
byte[] attemptStateData = (attemptStateDataPB == null) ? null :
if (LOG.isDebugEnabled()) {
LOG.debug(operation + " info for attempt: " + appAttemptId + " at: "
+ path);
switch (operation) {
case UPDATE:
if (exists(path)) {
safeSetData(path, attemptStateData, -1);
} else {
safeCreate(path, attemptStateData, zkAcl, CreateMode.PERSISTENT);
if (LOG.isDebugEnabled()) {
LOG.debug("Path " + path + " for " + appAttemptId + " didn't exist." +
" Created a new znode to update the application attempt state.");
case STORE:
safeCreate(path, attemptStateData, zkAcl, CreateMode.PERSISTENT);
case REMOVE:
protected synchronized void storeApplicationAttemptStateInternal(
ApplicationAttemptId appAttemptId,
ApplicationAttemptStateData attemptStateDataPB)
throws Exception {
String appDirPath = getNodePath(rmAppRoot,
String nodeCreatePath = getNodePath(appDirPath, appAttemptId.toString());
if (LOG.isDebugEnabled()) {
LOG.debug("Storing info for attempt: " + appAttemptId + " at: "
+ nodeCreatePath);
byte[] attemptStateData = attemptStateDataPB.getProto().toByteArray();
safeCreate(nodeCreatePath, attemptStateData, zkAcl, CreateMode.PERSISTENT);
handleApplicationAttemptStateOp(appAttemptId, attemptStateDataPB,
@ -641,65 +858,73 @@ protected synchronized void updateApplicationAttemptStateInternal(
ApplicationAttemptId appAttemptId,
ApplicationAttemptStateData attemptStateDataPB)
throws Exception {
String appIdStr = appAttemptId.getApplicationId().toString();
String appAttemptIdStr = appAttemptId.toString();
String appDirPath = getNodePath(rmAppRoot, appIdStr);
String nodeUpdatePath = getNodePath(appDirPath, appAttemptIdStr);
if (LOG.isDebugEnabled()) {
LOG.debug("Storing final state info for attempt: " + appAttemptIdStr
+ " at: " + nodeUpdatePath);
byte[] attemptStateData = attemptStateDataPB.getProto().toByteArray();
if (exists(nodeUpdatePath)) {
safeSetData(nodeUpdatePath, attemptStateData, -1);
} else {
safeCreate(nodeUpdatePath, attemptStateData, zkAcl,
if (LOG.isDebugEnabled()) {
LOG.debug(appAttemptId + " znode didn't exist. Created a new znode to"
+ " update the application attempt state.");
handleApplicationAttemptStateOp(appAttemptId, attemptStateDataPB,
protected synchronized void removeApplicationAttemptInternal(
ApplicationAttemptId appAttemptId) throws Exception {
String appId = appAttemptId.getApplicationId().toString();
String appIdRemovePath = getNodePath(rmAppRoot, appId);
String attemptIdRemovePath =
getNodePath(appIdRemovePath, appAttemptId.toString());
if (LOG.isDebugEnabled()) {
LOG.debug("Removing info for attempt: " + appAttemptId + " at: "
+ attemptIdRemovePath);
handleApplicationAttemptStateOp(appAttemptId, null, AppAttemptOp.REMOVE);
protected synchronized void removeApplicationStateInternal(
ApplicationStateData appState) throws Exception {
String appId = appState.getApplicationSubmissionContext().getApplicationId()
String appIdRemovePath = getNodePath(rmAppRoot, appId);
getApplicationId().toString(), true, appState.attempts.keySet());
if (LOG.isDebugEnabled()) {
LOG.debug("Removing info for app: " + appId + " at: " + appIdRemovePath
+ " and its attempts.");
private void removeApp(String removeAppId) throws Exception {
removeApp(removeAppId, false, null);
* Remove application node and its attempt nodes.
* @param removeAppId Application Id to be removed.
* @param safeRemove Flag indicating if application and attempt nodes have to
* be removed safely under a fencing or not.
* @param attempts list of attempts to be removed associated with this app.
* Ignored if safeRemove flag is false as we recursively delete all the
* child nodes directly.
* @throws Exception if any exception occurs during ZK operation.
private void removeApp(String removeAppId, boolean safeRemove,
Set<ApplicationAttemptId> attempts) throws Exception {
String appIdRemovePath = getLeafAppIdNodePath(removeAppId, false);
int splitIndex = appIdNodeSplitIndex;
// Look for paths based on other split indices if path as per configured
// split index does not exist.
if (!exists(appIdRemovePath)) {
AppNodeSplitInfo alternatePathInfo = getAlternatePath(removeAppId);
if (alternatePathInfo != null) {
appIdRemovePath = alternatePathInfo.path;
splitIndex = alternatePathInfo.splitIndex;
} else {
// Alternate path not found so return.
for (ApplicationAttemptId attemptId : appState.attempts.keySet()) {
String attemptRemovePath =
getNodePath(appIdRemovePath, attemptId.toString());
if (safeRemove) {
if (LOG.isDebugEnabled()) {
LOG.debug("Removing info for app: " + removeAppId + " at: " +
appIdRemovePath + " and its attempts.");
if (attempts != null) {
for (ApplicationAttemptId attemptId : attempts) {
String attemptRemovePath =
getNodePath(appIdRemovePath, attemptId.toString());
} else {
// Check if we should remove the parent app node as well.
checkRemoveParentAppNode(appIdRemovePath, splitIndex);
@ -821,8 +1046,7 @@ public synchronized void deleteStore() throws Exception {
public synchronized void removeApplication(ApplicationId removeAppId)
throws Exception {
String appIdRemovePath = getNodePath(rmAppRoot, removeAppId.toString());
@ -921,6 +1145,79 @@ private void createRootDirRecursively(String path) throws Exception {
* Get alternate path for app id if path according to configured split index
* does not exist. We look for path based on all possible split indices.
* @param appId
* @return a {@link AppNodeSplitInfo} object containing the path and split
* index if it exists, null otherwise.
* @throws Exception if any problem occurs while performing ZK operation.
private AppNodeSplitInfo getAlternatePath(String appId) throws Exception {
for (Map.Entry<Integer, String> entry : rmAppRootHierarchies.entrySet()) {
// Look for other paths
int splitIndex = entry.getKey();
if (splitIndex != appIdNodeSplitIndex) {
String alternatePath =
getLeafAppIdNodePath(appId, entry.getValue(), splitIndex, false);
if (exists(alternatePath)) {
return new AppNodeSplitInfo(alternatePath, splitIndex);
return null;
* Returns leaf app node path based on app id and passed split index. If the
* passed flag createParentIfNotExists is true, also creates the parent app
* node if it does not exist.
* @param appId application id.
* @param rootNode app root node based on split index.
* @param appIdNodeSplitIdx split index.
* @param createParentIfNotExists flag which determines if parent app node
* needs to be created(as per split) if it does not exist.
* @return leaf app node path.
* @throws Exception if any problem occurs while performing ZK operation.
private String getLeafAppIdNodePath(String appId, String rootNode,
int appIdNodeSplitIdx, boolean createParentIfNotExists) throws Exception {
if (appIdNodeSplitIdx == 0) {
return getNodePath(rootNode, appId);
String nodeName = appId;
int splitIdx = nodeName.length() - appIdNodeSplitIdx;
String rootNodePath =
getNodePath(rootNode, nodeName.substring(0, splitIdx));
if (createParentIfNotExists && !exists(rootNodePath)) {
try {
safeCreate(rootNodePath, null, zkAcl, CreateMode.PERSISTENT);
} catch (KeeperException.NodeExistsException e) {
if (LOG.isDebugEnabled()) {
LOG.debug("Unable to create app parent node " + rootNodePath +
" as it already exists.");
return getNodePath(rootNodePath, nodeName.substring(splitIdx));
* Returns leaf app node path based on app id and configured split index. If
* the passed flag createParentIfNotExists is true, also creates the parent
* app node if it does not exist.
* @param appId application id.
* @param createParentIfNotExists flag which determines if parent app node
* needs to be created(as per split) if it does not exist.
* @return leaf app node path.
* @throws Exception if any problem occurs while performing ZK operation.
private String getLeafAppIdNodePath(String appId,
boolean createParentIfNotExists) throws Exception {
return getLeafAppIdNodePath(appId, rmAppRootHierarchies.get(
appIdNodeSplitIndex), appIdNodeSplitIndex, createParentIfNotExists);
byte[] getData(final String path) throws Exception {
return curatorFramework.getData().forPath(path);
@ -931,11 +1228,13 @@ List<ACL> getACL(final String path) throws Exception {
return curatorFramework.getACL().forPath(path);
private List<String> getChildren(final String path) throws Exception {
List<String> getChildren(final String path) throws Exception {
return curatorFramework.getChildren().forPath(path);
private boolean exists(final String path) throws Exception {
boolean exists(final String path) throws Exception {
return curatorFramework.checkExists().forPath(path) != null;
@ -964,6 +1263,11 @@ private void safeCreate(String path, byte[] data, List<ACL> acl,
* Deletes the path. Checks for existence of path as well.
* @param path Path to be deleted.
* @throws Exception if any problem occurs while performing deletion.
private void safeDelete(final String path) throws Exception {
if (exists(path)) {
SafeTransaction transaction = new SafeTransaction();
@ -87,8 +87,13 @@ public void testRemoveApplicationFromStateStoreCmdForZK() throws Exception {
ZKRMStateStore.ROOT_ZNODE_NAME + "/" + RMStateStore.RM_APP_ROOT;
String appIdPath = appRootPath + "/" + appId;
assertEquals("Application node for " + appId + "should exist",
appId, curatorFramework.getChildren().forPath(appRootPath).get(0));
for (String path : curatorFramework.getChildren().forPath(appRootPath)) {
if (path.equals(ZKRMStateStore.RM_APP_ROOT_HIERARCHIES)) {
assertEquals("Application node for " + appId + " should exist",
appId, path);
try {
ResourceManager.removeApplication(conf, appId);
} catch (Exception e) {
@ -96,8 +101,10 @@ public void testRemoveApplicationFromStateStoreCmdForZK() throws Exception {
"rm state store.");
assertTrue("After remove app from store there should be no child nodes" +
" in app root path",
" for application in app root path",
curatorFramework.getChildren().forPath(appRootPath).size() == 1 &&
@ -156,8 +156,7 @@ void waitNotify(TestDispatcher dispatcher) {
protected RMApp storeApp(RMStateStore store, ApplicationId appId,
long submitTime,
long startTime) throws Exception {
long submitTime, long startTime) throws Exception {
ApplicationSubmissionContext context =
new ApplicationSubmissionContextPBImpl();
@ -200,6 +199,13 @@ protected RMAppAttempt storeAttempt(RMStateStore store,
return mockAttempt;
protected void updateAttempt(RMStateStore store, TestDispatcher dispatcher,
ApplicationAttemptStateData attemptState) {
dispatcher.attemptId = attemptState.getAttemptId();
void testRMAppStateStore(RMStateStoreHelper stateStoreHelper)
throws Exception {
testRMAppStateStore(stateStoreHelper, new StoreStateVerifier());
@ -28,6 +28,8 @@
import org.apache.hadoop.ha.HAServiceProtocol;
import org.apache.hadoop.ha.HAServiceProtocol.StateChangeRequestInfo;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.security.Credentials;
import org.apache.hadoop.security.token.Token;
import org.apache.hadoop.security.token.delegation.DelegationKey;
import org.apache.hadoop.service.Service;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
@ -40,19 +42,25 @@
import org.apache.hadoop.yarn.api.records.impl.pb.ContainerPBImpl;
import org.apache.hadoop.yarn.conf.HAUtil;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.security.AMRMTokenIdentifier;
import org.apache.hadoop.yarn.security.client.RMDelegationTokenIdentifier;
import org.apache.hadoop.yarn.server.records.Version;
import org.apache.hadoop.yarn.server.records.impl.pb.VersionPBImpl;
import org.apache.hadoop.yarn.server.resourcemanager.MockRM;
import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager;
import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore.RMState;
import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.ApplicationAttemptStateData;
import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.ApplicationStateData;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppState;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.AggregateAppResourceUsage;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptMetrics;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptState;
import org.apache.hadoop.yarn.server.resourcemanager.security.AMRMTokenSecretManager;
import org.apache.hadoop.yarn.server.resourcemanager.security.ClientToAMTokenSecretManagerInRM;
import org.apache.hadoop.yarn.server.security.MasterKeyData;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.ZooDefs.Perms;
import org.apache.zookeeper.data.ACL;
@ -61,13 +69,22 @@
import org.junit.Before;
import org.junit.Test;
import com.google.common.base.Joiner;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Lists;
import static org.junit.Assert.assertArrayEquals;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertTrue;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.when;
import java.io.IOException;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import javax.crypto.SecretKey;
@ -130,9 +147,21 @@ public Version getCurrentVersion() {
private String getAppNode(String appId, int splitIdx) {
String rootPath = workingZnode + "/" + ROOT_ZNODE_NAME + "/" +
String appPath = appId;
if (splitIdx != 0) {
int idx = appId.length() - splitIdx;
appPath = appId.substring(0, idx) + "/" + appId.substring(idx);
return rootPath + "/" + RM_APP_ROOT_HIERARCHIES + "/" +
Integer.toString(splitIdx) + "/" + appPath;
return rootPath + "/" + appPath;
public String getAppNode(String appId) {
return workingZnode + "/" + ROOT_ZNODE_NAME + "/" + RM_APP_ROOT + "/"
+ appId;
return getAppNode(appId, 0);
public String getAttemptNode(String appId, String attemptId) {
@ -149,8 +178,7 @@ public void testRetryingCreateRootDir() throws Exception {
public RMStateStore getRMStateStore() throws Exception {
YarnConfiguration conf = new YarnConfiguration();
private RMStateStore createStore(Configuration conf) throws Exception {
workingZnode = "/jira/issue/3077/rmstore";
@ -159,6 +187,15 @@ public RMStateStore getRMStateStore() throws Exception {
return this.store;
public RMStateStore getRMStateStore(Configuration conf) throws Exception {
return createStore(conf);
public RMStateStore getRMStateStore() throws Exception {
YarnConfiguration conf = new YarnConfiguration();
return createStore(conf);
public boolean isFinalStateValid() throws Exception {
return 1 ==
@ -178,8 +215,12 @@ public Version getCurrentVersion() throws Exception {
public boolean appExists(RMApp app) throws Exception {
String appIdPath = app.getApplicationId().toString();
int split =
return null != curatorFramework.checkExists()
.forPath(store.getAppNode(appIdPath, split));
public boolean attemptExists(RMAppAttempt attempt) throws Exception {
@ -342,7 +383,6 @@ public void testZKRootPathAcls() throws Exception {
public void testFencing() throws Exception {
StateChangeRequestInfo req = new StateChangeRequestInfo(
@ -382,13 +422,15 @@ public void testFencing() throws Exception {
assertEquals("RM should be Active",
public void testFencedState() throws Exception {
TestZKRMStateStoreTester zkTester = new TestZKRMStateStoreTester();
RMStateStore store = zkTester.getRMStateStore();
RMStateStore store = zkTester.getRMStateStore();
// Move state to FENCED from ACTIVE
assertEquals("RMStateStore should have been in fenced state",
@ -527,4 +569,518 @@ public void testDuplicateRMAppDeletion() throws Exception {
private static String createPath(String... parts) {
return Joiner.on("/").join(parts);
private static Configuration createConfForAppNodeSplit(int splitIndex) {
YarnConfiguration conf = new YarnConfiguration();
conf.setInt(YarnConfiguration.ZK_APPID_NODE_SPLIT_INDEX, splitIndex);
return conf;
private static RMApp createMockAppForRemove(ApplicationId appId,
ApplicationAttemptId... attemptIds) {
RMApp app = mock(RMApp.class);
ApplicationSubmissionContextPBImpl context =
new ApplicationSubmissionContextPBImpl();
if (attemptIds.length > 0) {
HashMap<ApplicationAttemptId, RMAppAttempt> attempts = new HashMap<>();
for (ApplicationAttemptId attemptId : attemptIds) {
RMAppAttempt appAttempt = mock(RMAppAttempt.class);
attempts.put(attemptId, appAttempt);
return app;
private static void verifyLoadedApp(ApplicationStateData appState,
ApplicationId appId, String user, long submitTime, long startTime,
RMAppState state, long finishTime, String diagnostics) {
// Check if app is loaded correctly
assertNotNull("App " + appId + " should have been loaded.", appState);
assertEquals("App submit time in app state", submitTime,
assertEquals("App start time in app state", startTime,
assertEquals("App ID in app state", appId,
assertEquals("App state", state, appState.getState());
assertEquals("Finish time in app state", finishTime,
assertEquals("User in app state", user, appState.getUser());
assertEquals("Diagnostics in app state", diagnostics,
private static void verifyLoadedApp(RMState rmState,
ApplicationId appId, long submitTime, long startTime, long finishTime,
boolean isFinished, List<ApplicationAttemptId> attempts) {
verifyLoadedApp(rmState, appId, submitTime, startTime, finishTime,
isFinished, attempts, null, null);
private static void verifyLoadedApp(RMState rmState,
ApplicationId appId, long submitTime, long startTime, long finishTime,
boolean isFinished, List<ApplicationAttemptId> attempts,
List<Integer> amExitStatuses,
List<FinalApplicationStatus> finalStatuses) {
Map<ApplicationId, ApplicationStateData> rmAppState =
ApplicationStateData appState = rmAppState.get(appId);
assertNotNull(appId + " is not there in loaded apps", appState);
verifyLoadedApp(appState, appId, "test", submitTime, startTime,
isFinished ? RMAppState.FINISHED : null, finishTime,
isFinished ? "appDiagnostics" : "");
// Check attempt state.
if (attempts != null) {
assertEquals("Attempts loaded for app " + appId, attempts.size(),
if (finalStatuses != null && amExitStatuses != null) {
for (int i = 0; i < attempts.size(); i++) {
if (finalStatuses.get(i) != null) {
verifyLoadedAttempt(appState, attempts.get(i),
amExitStatuses.get(i), true);
} else {
verifyLoadedAttempt(appState, attempts.get(i),
amExitStatuses.get(i), false);
} else {
"Attempts loaded for app " + appId, 0, appState.attempts.size());
private static void verifyLoadedAttempt(ApplicationStateData appState,
ApplicationAttemptId attemptId, int amExitStatus, boolean isFinished) {
verifyLoadedAttempt(appState, attemptId, isFinished ? "myTrackingUrl" :
"N/A", ContainerId.newContainerId(attemptId, 1), null,
isFinished ? RMAppAttemptState.FINISHED : null, isFinished ?
"attemptDiagnostics" : "", 0, amExitStatus,
isFinished ? FinalApplicationStatus.SUCCEEDED : null);
private static void verifyLoadedAttempt(ApplicationStateData appState,
ApplicationAttemptId attemptId, String trackingURL,
ContainerId masterContainerId, SecretKey clientTokenKey,
RMAppAttemptState state, String diagnostics, long finishTime,
int amExitStatus, FinalApplicationStatus finalStatus) {
ApplicationAttemptStateData attemptState = appState.getAttempt(attemptId);
// Check if attempt is loaded correctly
"Attempt " + attemptId + " should have been loaded.", attemptState);
assertEquals("Attempt Id in attempt state",
attemptId, attemptState.getAttemptId());
assertEquals("Master Container Id in attempt state",
masterContainerId, attemptState.getMasterContainer().getId());
if (null != clientTokenKey) {
assertArrayEquals("Client token key in attempt state",
clientTokenKey.getEncoded(), attemptState.getAppAttemptTokens().
assertEquals("Attempt state", state, attemptState.getState());
assertEquals("Finish time in attempt state", finishTime,
assertEquals("Diagnostics in attempt state", diagnostics,
assertEquals("AM Container exit status in attempt state", amExitStatus,
assertEquals("Final app status in attempt state", finalStatus,
assertEquals("Tracking URL in attempt state", trackingURL,
private static ApplicationStateData createAppState(
ApplicationSubmissionContext ctxt, long submitTime, long startTime,
long finishTime, boolean isFinished) {
return ApplicationStateData.newInstance(submitTime, startTime, "test",
ctxt, isFinished ? RMAppState.FINISHED : null, isFinished ?
"appDiagnostics" : "", isFinished ? finishTime : 0, null);
private static ApplicationAttemptStateData createFinishedAttempt(
ApplicationAttemptId attemptId, Container container, long startTime,
int amExitStatus) {
return ApplicationAttemptStateData.newInstance(attemptId,
container, null, startTime, RMAppAttemptState.FINISHED,
"myTrackingUrl", "attemptDiagnostics", FinalApplicationStatus.SUCCEEDED,
amExitStatus, 0, 0, 0, 0, 0);
private ApplicationAttemptId storeAttempt(RMStateStore store,
TestDispatcher dispatcher, String appAttemptIdStr,
AMRMTokenSecretManager appTokenMgr,
ClientToAMTokenSecretManagerInRM clientToAMTokenMgr,
boolean createContainer) throws Exception {
ApplicationAttemptId attemptId =
Token<AMRMTokenIdentifier> appAttemptToken = null;
if (appTokenMgr != null) {
appAttemptToken = generateAMRMToken(attemptId, appTokenMgr);
SecretKey clientTokenKey = null;
if (clientToAMTokenMgr != null) {
clientTokenKey = clientToAMTokenMgr.createMasterKey(attemptId);
Credentials attemptCred = new Credentials();
ContainerId containerId = null;
if (createContainer) {
containerId = ContainerId.newContainerId(attemptId, 1);
storeAttempt(store, attemptId, containerId.toString(), appAttemptToken,
clientTokenKey, dispatcher);
return attemptId;
private void finishAppWithAttempts(RMState state, RMStateStore store,
TestDispatcher dispatcher, ApplicationAttemptId attemptId,
long submitTime, long startTime, int amExitStatus, long finishTime,
boolean createNewApp) throws Exception {
ApplicationId appId = attemptId.getApplicationId();
ApplicationStateData appStateNew = null;
if (createNewApp) {
ApplicationSubmissionContext context =
new ApplicationSubmissionContextPBImpl();
appStateNew = createAppState(context, submitTime, startTime, finishTime,
} else {
ApplicationStateData appState = state.getApplicationState().get(appId);
appStateNew = createAppState(appState.getApplicationSubmissionContext(),
submitTime, startTime, finishTime, true);
Container container = new ContainerPBImpl();
container.setId(ContainerId.newContainerId(attemptId, 1));
ApplicationAttemptStateData newAttemptState =
createFinishedAttempt(attemptId, container, startTime, amExitStatus);
updateAttempt(store, dispatcher, newAttemptState);
private void storeAppWithAttempts(RMStateStore store,
TestDispatcher dispatcher, ApplicationAttemptId attemptId,
long submitTime, long startTime) throws Exception {
storeAppWithAttempts(store, dispatcher, submitTime, startTime, null, null,
private void storeApp(RMStateStore store, TestDispatcher dispatcher,
ApplicationId appId, long submitTime, long startTime) throws Exception {
storeApp(store, appId, submitTime, startTime);
private void storeAppWithAttempts(RMStateStore store,
TestDispatcher dispatcher, long submitTime, long startTime,
AMRMTokenSecretManager appTokenMgr,
ClientToAMTokenSecretManagerInRM clientToAMTokenMgr,
ApplicationAttemptId attemptId, ApplicationAttemptId... attemptIds)
throws Exception {
ApplicationId appId = attemptId.getApplicationId();
storeApp(store, dispatcher, appId, submitTime, startTime);
storeAttempt(store, dispatcher, attemptId.toString(), appTokenMgr,
clientToAMTokenMgr, true);
for (ApplicationAttemptId attempt : attemptIds) {
storeAttempt(store, dispatcher, attempt.toString(), appTokenMgr,
clientToAMTokenMgr, true);
private static void removeApps(RMStateStore store,
Map<ApplicationId, ApplicationAttemptId[]> appWithAttempts) {
for (Map.Entry<ApplicationId, ApplicationAttemptId[]> entry :
appWithAttempts.entrySet()) {
RMApp mockApp = createMockAppForRemove(entry.getKey(), entry.getValue());
private static void verifyAppPathPath(RMStateStore store, ApplicationId appId,
int splitIndex) throws Exception {
String appIdStr = appId.toString();
String appParent = appIdStr.substring(0, appIdStr.length() - splitIndex);
String appPath = appIdStr.substring(appIdStr.length() - splitIndex);
String path = createPath(((ZKRMStateStore)store).znodeWorkingPath,
ZKRMStateStore.RM_APP_ROOT_HIERARCHIES, String.valueOf(splitIndex),
appParent, appPath);
assertTrue("Application with id " + appIdStr + " does not exist as per " +
"split in state store.", ((ZKRMStateStore)store).exists(path));
private static void verifyAppInHierarchicalPath(RMStateStore store,
String appId, int splitIdx) throws Exception {
String path = createPath(((ZKRMStateStore)store).znodeWorkingPath,
if (splitIdx != 0) {
path = createPath(path, ZKRMStateStore.RM_APP_ROOT_HIERARCHIES,
String.valueOf(splitIdx), appId.substring(0, appId.length() -
splitIdx), appId.substring(appId.length() - splitIdx));
} else {
path = createPath(path, appId);
assertTrue(appId + " should exist in path " + path,
private static void assertHierarchicalPaths(RMStateStore store,
Map<Integer, Integer> pathToApps) throws Exception {
for (Map.Entry<Integer, Integer> entry : pathToApps.entrySet()) {
String path = createPath(((ZKRMStateStore)store).znodeWorkingPath,
if (entry.getKey() != 0) {
path = createPath(path, ZKRMStateStore.RM_APP_ROOT_HIERARCHIES,
assertEquals("Number of childrens for path " + path,
(int) entry.getValue(),
// Test to verify storing of apps and app attempts in ZK state store with app
// node split index configured more than 0.
public void testAppNodeSplit() throws Exception {
TestZKRMStateStoreTester zkTester = new TestZKRMStateStoreTester();
long submitTime = System.currentTimeMillis();
long startTime = submitTime + 1234;
Configuration conf = new YarnConfiguration();
// Get store with app node split config set as 1.
RMStateStore store = zkTester.getRMStateStore(createConfForAppNodeSplit(1));
TestDispatcher dispatcher = new TestDispatcher();
// Create RM Context and app token manager.
RMContext rmContext = mock(RMContext.class);
AMRMTokenSecretManager appTokenMgr =
spy(new AMRMTokenSecretManager(conf, rmContext));
MasterKeyData masterKeyData = appTokenMgr.createNewMasterKey();
ClientToAMTokenSecretManagerInRM clientToAMTokenMgr =
new ClientToAMTokenSecretManagerInRM();
// Store app1.
ApplicationId appId1 = ApplicationId.newInstance(1352994193343L, 1);
ApplicationAttemptId attemptId1 =
ApplicationAttemptId.newInstance(appId1, 1);
ApplicationAttemptId attemptId2 =
ApplicationAttemptId.newInstance(appId1, 2);
storeAppWithAttempts(store, dispatcher, submitTime, startTime,
appTokenMgr, clientToAMTokenMgr, attemptId1, attemptId2);
// Store app2 with app id application_1352994193343_120213.
ApplicationId appId21 = ApplicationId.newInstance(1352994193343L, 120213);
storeApp(store, appId21, submitTime, startTime);
// Store another app which will be removed.
ApplicationId appIdRemoved = ApplicationId.newInstance(1352994193343L, 2);
ApplicationAttemptId attemptIdRemoved =
ApplicationAttemptId.newInstance(appIdRemoved, 1);
storeAppWithAttempts(store, dispatcher, submitTime, startTime,
null, null, attemptIdRemoved);
// Remove the app.
RMApp mockRemovedApp =
createMockAppForRemove(appIdRemoved, attemptIdRemoved);
// Close state store
// Load state store
store = zkTester.getRMStateStore(createConfForAppNodeSplit(1));
RMState state = store.loadState();
// Check if application_1352994193343_120213 (i.e. app2) exists in state
// store as per split index.
verifyAppPathPath(store, appId21, 1);
// Verify loaded apps and attempts based on the operations we did before
// reloading the state store.
verifyLoadedApp(state, appId1, submitTime, startTime, 0, false,
Lists.newArrayList(attemptId1, attemptId2), Lists.newArrayList(-1000,
-1000), Lists.newArrayList((FinalApplicationStatus) null, null));
// Update app state for app1.
finishAppWithAttempts(state, store, dispatcher, attemptId2, submitTime,
startTime, 100, 1234, false);
// Test updating app/attempt for app whose initial state is not saved
ApplicationId dummyAppId = ApplicationId.newInstance(1234, 10);
ApplicationAttemptId dummyAttemptId =
ApplicationAttemptId.newInstance(dummyAppId, 6);
finishAppWithAttempts(state, store, dispatcher, dummyAttemptId, submitTime,
startTime, 111, 1234, true);
// Close the store
// Check updated application state.
store = zkTester.getRMStateStore(createConfForAppNodeSplit(1));
RMState newRMState = store.loadState();
verifyLoadedApp(newRMState, dummyAppId, submitTime, startTime, 1234, true,
Lists.newArrayList(dummyAttemptId), Lists.newArrayList(111),
verifyLoadedApp(newRMState, appId1, submitTime, startTime, 1234, true,
Lists.newArrayList(attemptId1, attemptId2),
Lists.newArrayList(-1000, 100), Lists.newArrayList(null,
// assert store is in expected state after everything is cleaned
assertTrue("Store is not in expected state", zkTester.isFinalStateValid());
// Test to verify storing of apps and app attempts in ZK state store with app
// node split index config changing across restarts.
public void testAppNodeSplitChangeAcrossRestarts() throws Exception {
TestZKRMStateStoreTester zkTester = new TestZKRMStateStoreTester();
long submitTime = System.currentTimeMillis();
long startTime = submitTime + 1234;
Configuration conf = new YarnConfiguration();
// Create store with app node split set as 1.
RMStateStore store = zkTester.getRMStateStore(createConfForAppNodeSplit(1));
TestDispatcher dispatcher = new TestDispatcher();
RMContext rmContext = mock(RMContext.class);
AMRMTokenSecretManager appTokenMgr =
spy(new AMRMTokenSecretManager(conf, rmContext));
MasterKeyData masterKeyData = appTokenMgr.createNewMasterKey();
ClientToAMTokenSecretManagerInRM clientToAMTokenMgr =
new ClientToAMTokenSecretManagerInRM();
// Store app1 with 2 attempts.
ApplicationId appId1 = ApplicationId.newInstance(1442994194053L, 1);
ApplicationAttemptId attemptId1 =
ApplicationAttemptId.newInstance(appId1, 1);
ApplicationAttemptId attemptId2 =
ApplicationAttemptId.newInstance(appId1, 2);
storeAppWithAttempts(store, dispatcher, submitTime, startTime,
appTokenMgr, clientToAMTokenMgr, attemptId1, attemptId2);
// Store app2 and associated attempt.
ApplicationId appId11 = ApplicationId.newInstance(1442994194053L, 2);
ApplicationAttemptId attemptId11 =
ApplicationAttemptId.newInstance(appId11, 1);
storeAppWithAttempts(store, dispatcher, attemptId11, submitTime, startTime);
// Close state store
// Load state store with app node split config of 2.
store = zkTester.getRMStateStore(createConfForAppNodeSplit(2));
RMState state = store.loadState();
ApplicationId appId21 = ApplicationId.newInstance(1442994194053L, 120213);
storeApp(store, dispatcher, appId21, submitTime, startTime);
// Check if app is loaded correctly despite change in split index.
verifyLoadedApp(state, appId1, submitTime, startTime, 0, false,
Lists.newArrayList(attemptId1, attemptId2), Lists.newArrayList(-1000,
-1000), Lists.newArrayList((FinalApplicationStatus) null, null));
// Finish app/attempt state
finishAppWithAttempts(state, store, dispatcher, attemptId2, submitTime,
startTime, 100, 1234, false);
// Test updating app/attempt for app whose initial state is not saved
ApplicationId dummyAppId = ApplicationId.newInstance(1234, 10);
ApplicationAttemptId dummyAttemptId =
ApplicationAttemptId.newInstance(dummyAppId, 6);
finishAppWithAttempts(state, store, dispatcher, dummyAttemptId, submitTime,
startTime, 111, 1234, true);
// Close the store
// Load state store this time with split index of 0.
store = zkTester.getRMStateStore(createConfForAppNodeSplit(0));
state = store.loadState();
assertEquals("Number of Apps loaded should be 4.", 4,
verifyLoadedApp(state, appId1, submitTime, startTime, 1234, true,
Lists.newArrayList(attemptId1, attemptId2), Lists.newArrayList(-1000,
100), Lists.newArrayList(null, FinalApplicationStatus.SUCCEEDED));
// Remove attempt1
ApplicationId appId31 = ApplicationId.newInstance(1442994195071L, 45);
storeApp(store, dispatcher, appId31, submitTime, startTime);
// Close state store.
// Load state store with split index of 3.
store = zkTester.getRMStateStore(createConfForAppNodeSplit(3));
state = store.loadState();
assertEquals("Number of apps loaded should be 5.", 5,
verifyLoadedApp(state, dummyAppId, submitTime, startTime, 1234, true,
Lists.newArrayList(dummyAttemptId), Lists.newArrayList(111),
verifyLoadedApp(state, appId31, submitTime, startTime, 0, false, null);
verifyLoadedApp(state, appId21, submitTime, startTime, 0, false, null);
verifyLoadedApp(state, appId11, submitTime, startTime, 0, false,
Lists.newArrayList(attemptId11), Lists.newArrayList(-1000),
Lists.newArrayList((FinalApplicationStatus) null));
verifyLoadedApp(state, appId1, submitTime, startTime, 1234, true,
Lists.newArrayList(attemptId2), Lists.newArrayList(100),
// Store another app.
ApplicationId appId41 = ApplicationId.newInstance(1442994195087L, 1);
storeApp(store, dispatcher, appId41, submitTime, startTime);
// Check how many apps exist in each of the hierarchy based paths. 0 paths
// should exist in "HIERARCHIES/4" path as app split index was never set
// as 4 in tests above.
assertHierarchicalPaths(store, ImmutableMap.of(0, 2, 1, 1, 2, 2,
3, 1, 4, 0));
verifyAppInHierarchicalPath(store, "application_1442994195087_0001", 3);
ApplicationId appId71 = ApplicationId.newInstance(1442994195087L, 7);
//storeApp(store, dispatcher, appId71, submitTime, startTime);
storeApp(store, appId71, submitTime, startTime);
ApplicationAttemptId attemptId71 =
ApplicationAttemptId.newInstance(appId71, 1);
storeAttempt(store, ApplicationAttemptId.newInstance(appId71, 1),
ContainerId.newContainerId(attemptId71, 1).toString(), null, null,
// Remove applications.
removeApps(store, ImmutableMap.of(appId11, new ApplicationAttemptId[]
{attemptId11}, appId71, new ApplicationAttemptId[] {attemptId71},
appId41, new ApplicationAttemptId[0], appId31,
new ApplicationAttemptId[0], appId21, new ApplicationAttemptId[0]));
removeApps(store, ImmutableMap.of(dummyAppId,
new ApplicationAttemptId[] {dummyAttemptId}, appId1,
new ApplicationAttemptId[] {attemptId1, attemptId2}));
// Load state store with split index of 3 again. As all apps have been
// removed nothing should be loaded back.
store = zkTester.getRMStateStore(createConfForAppNodeSplit(3));
state = store.loadState();
assertEquals("Number of apps loaded should be 0.", 0,
// Close the state store.
Reference in New Issue
Block a user