YARN-2962. ZKRMStateStore: Limit the number of znodes under a znode (Contributed by Varun Sexena via Daniel Templeton)

This commit is contained in:
Daniel Templeton 2017-04-28 13:26:36 -07:00
parent fdf5192bbb
commit 2e52789edf
6 changed files with 1013 additions and 116 deletions

View File

@ -568,6 +568,10 @@ public class YarnConfiguration extends Configuration {
public static final String RM_ZK_TIMEOUT_MS = RM_ZK_PREFIX + "timeout-ms"; public static final String RM_ZK_TIMEOUT_MS = RM_ZK_PREFIX + "timeout-ms";
public static final int DEFAULT_RM_ZK_TIMEOUT_MS = 10000; public static final int DEFAULT_RM_ZK_TIMEOUT_MS = 10000;
public static final String ZK_APPID_NODE_SPLIT_INDEX =
RM_ZK_PREFIX + "appid-node.split-index";
public static final int DEFAULT_ZK_APPID_NODE_SPLIT_INDEX = 0;
public static final String RM_ZK_ACL = RM_ZK_PREFIX + "acl"; public static final String RM_ZK_ACL = RM_ZK_PREFIX + "acl";
public static final String DEFAULT_RM_ZK_ACL = "world:anyone:rwcda"; public static final String DEFAULT_RM_ZK_ACL = "world:anyone:rwcda";

View File

@ -609,7 +609,27 @@
</property> </property>
<property> <property>
<description>Name of the cluster. In an HA setting, <description>Index at which last section of application id (with each section
separated by _ in application id) will be split so that application znode
stored in zookeeper RM state store will be stored as two different znodes
(parent-child). Split is done from the end.
For instance, with no split, appid znode will be of the form
application_1352994193343_0001. If the value of this config is 1, the
appid znode will be broken into two parts application_1352994193343_000
and 1 respectively with former being the parent node.
application_1352994193343_0002 will then be stored as 2 under the parent
node application_1352994193343_000. This config can take values from 0 to 4.
0 means there will be no split. If configuration value is outside this
range, it will be treated as config value of 0(i.e. no split). A value
larger than 0 (up to 4) should be configured if you are storing a large number
of apps in ZK based RM state store and state store operations are failing due to
LenError in Zookeeper.</description>
<name>yarn.resourcemanager.zk-appid-node.split-index</name>
<value>0</value>
</property>
<property>
<description>Name of the cluster. In a HA setting,
this is used to ensure the RM participates in leader this is used to ensure the RM participates in leader
election for this cluster and ensures it does not affect election for this cluster and ensures it does not affect
other clusters</description> other clusters</description>

View File

@ -57,6 +57,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.impl.pb.Ap
import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.impl.pb.ApplicationStateDataPBImpl; import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.impl.pb.ApplicationStateDataPBImpl;
import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.impl.pb.EpochPBImpl; import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.impl.pb.EpochPBImpl;
import org.apache.zookeeper.CreateMode; import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.ZooDefs; import org.apache.zookeeper.ZooDefs;
import org.apache.zookeeper.data.ACL; import org.apache.zookeeper.data.ACL;
import org.apache.zookeeper.data.Id; import org.apache.zookeeper.data.Id;
@ -72,6 +73,8 @@ import java.security.NoSuchAlgorithmException;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.HashMap; import java.util.HashMap;
import java.util.List; import java.util.List;
import java.util.Map;
import java.util.Set;
/** /**
* {@link RMStateStore} implementation backed by ZooKeeper. * {@link RMStateStore} implementation backed by ZooKeeper.
@ -82,6 +85,31 @@ import java.util.List;
* |--- EPOCH_NODE * |--- EPOCH_NODE
* |--- RM_ZK_FENCING_LOCK * |--- RM_ZK_FENCING_LOCK
* |--- RM_APP_ROOT * |--- RM_APP_ROOT
* | |----- HIERARCHIES
* | | |----- 1
* | | | |----- (#ApplicationId barring last character)
* | | | | |----- (#Last character of ApplicationId)
* | | | | | |----- (#ApplicationAttemptIds)
* | | | ....
* | | |
* | | |----- 2
* | | | |----- (#ApplicationId barring last 2 characters)
* | | | | |----- (#Last 2 characters of ApplicationId)
* | | | | | |----- (#ApplicationAttemptIds)
* | | | ....
* | | |
* | | |----- 3
* | | | |----- (#ApplicationId barring last 3 characters)
* | | | | |----- (#Last 3 characters of ApplicationId)
* | | | | | |----- (#ApplicationAttemptIds)
* | | | ....
* | | |
* | | |----- 4
* | | | |----- (#ApplicationId barring last 4 characters)
* | | | | |----- (#Last 4 characters of ApplicationId)
* | | | | | |----- (#ApplicationAttemptIds)
* | | | ....
* | | |
* | |----- (#ApplicationId1) * | |----- (#ApplicationId1)
* | | |----- (#ApplicationAttemptIds) * | | |----- (#ApplicationAttemptIds)
* | | * | |
@ -121,6 +149,7 @@ import java.util.List;
@Unstable @Unstable
public class ZKRMStateStore extends RMStateStore { public class ZKRMStateStore extends RMStateStore {
private static final Log LOG = LogFactory.getLog(ZKRMStateStore.class); private static final Log LOG = LogFactory.getLog(ZKRMStateStore.class);
private static final String RM_DELEGATION_TOKENS_ROOT_ZNODE_NAME = private static final String RM_DELEGATION_TOKENS_ROOT_ZNODE_NAME =
"RMDelegationTokensRoot"; "RMDelegationTokensRoot";
private static final String RM_DT_SEQUENTIAL_NUMBER_ZNODE_NAME = private static final String RM_DT_SEQUENTIAL_NUMBER_ZNODE_NAME =
@ -129,12 +158,15 @@ public class ZKRMStateStore extends RMStateStore {
"RMDTMasterKeysRoot"; "RMDTMasterKeysRoot";
@VisibleForTesting @VisibleForTesting
public static final String ROOT_ZNODE_NAME = "ZKRMStateRoot"; public static final String ROOT_ZNODE_NAME = "ZKRMStateRoot";
protected static final Version CURRENT_VERSION_INFO = protected static final Version CURRENT_VERSION_INFO = Version
Version.newInstance(1, 3); .newInstance(2, 0);
@VisibleForTesting
public static final String RM_APP_ROOT_HIERARCHIES = "HIERARCHIES";
/* Znode paths */ /* Znode paths */
private String zkRootNodePath; private String zkRootNodePath;
private String rmAppRoot; private String rmAppRoot;
private Map<Integer, String> rmAppRootHierarchies;
private String rmDTSecretManagerRoot; private String rmDTSecretManagerRoot;
private String dtMasterKeysRootPath; private String dtMasterKeysRootPath;
private String delegationTokensRootPath; private String delegationTokensRootPath;
@ -144,6 +176,7 @@ public class ZKRMStateStore extends RMStateStore {
@VisibleForTesting @VisibleForTesting
protected String znodeWorkingPath; protected String znodeWorkingPath;
private int appIdNodeSplitIndex = 0;
/* Fencing related variables */ /* Fencing related variables */
private static final String FENCING_LOCK = "RM_ZK_FENCING_LOCK"; private static final String FENCING_LOCK = "RM_ZK_FENCING_LOCK";
@ -165,6 +198,27 @@ public class ZKRMStateStore extends RMStateStore {
@VisibleForTesting @VisibleForTesting
protected CuratorFramework curatorFramework; protected CuratorFramework curatorFramework;
/*
* Indicates different app attempt state store operations.
*/
private enum AppAttemptOp {
STORE,
UPDATE,
REMOVE
};
/**
* Encapsulates full app node path and corresponding split index.
*/
private final static class AppNodeSplitInfo {
private final String path;
private final int splitIndex;
AppNodeSplitInfo(String path, int splitIndex) {
this.path = path;
this.splitIndex = splitIndex;
}
}
/** /**
* Given the {@link Configuration} and {@link ACL}s used (sourceACLs) for * Given the {@link Configuration} and {@link ACL}s used (sourceACLs) for
* ZooKeeper access, construct the {@link ACL}s for the store's root node. * ZooKeeper access, construct the {@link ACL}s for the store's root node.
@ -212,11 +266,30 @@ public class ZKRMStateStore extends RMStateStore {
conf.get(YarnConfiguration.ZK_RM_STATE_STORE_PARENT_PATH, conf.get(YarnConfiguration.ZK_RM_STATE_STORE_PARENT_PATH,
YarnConfiguration.DEFAULT_ZK_RM_STATE_STORE_PARENT_PATH); YarnConfiguration.DEFAULT_ZK_RM_STATE_STORE_PARENT_PATH);
zkRootNodePath = getNodePath(znodeWorkingPath, ROOT_ZNODE_NAME); zkRootNodePath = getNodePath(znodeWorkingPath, ROOT_ZNODE_NAME);
fencingNodePath = getNodePath(zkRootNodePath, FENCING_LOCK);
rmAppRoot = getNodePath(zkRootNodePath, RM_APP_ROOT); rmAppRoot = getNodePath(zkRootNodePath, RM_APP_ROOT);
String hierarchiesPath = getNodePath(rmAppRoot, RM_APP_ROOT_HIERARCHIES);
rmAppRootHierarchies = new HashMap<>(5);
rmAppRootHierarchies.put(0, rmAppRoot);
for (int splitIndex = 1; splitIndex <= 4; splitIndex++) {
rmAppRootHierarchies.put(splitIndex,
getNodePath(hierarchiesPath, Integer.toString(splitIndex)));
}
fencingNodePath = getNodePath(zkRootNodePath, FENCING_LOCK);
zkSessionTimeout = conf.getInt(YarnConfiguration.RM_ZK_TIMEOUT_MS, zkSessionTimeout = conf.getInt(YarnConfiguration.RM_ZK_TIMEOUT_MS,
YarnConfiguration.DEFAULT_RM_ZK_TIMEOUT_MS); YarnConfiguration.DEFAULT_RM_ZK_TIMEOUT_MS);
appIdNodeSplitIndex =
conf.getInt(YarnConfiguration.ZK_APPID_NODE_SPLIT_INDEX,
YarnConfiguration.DEFAULT_ZK_APPID_NODE_SPLIT_INDEX);
if (appIdNodeSplitIndex < 1 || appIdNodeSplitIndex > 4) {
LOG.info("Invalid value " + appIdNodeSplitIndex + " for config " +
YarnConfiguration.ZK_APPID_NODE_SPLIT_INDEX + " specified. " +
"Resetting it to " +
YarnConfiguration.DEFAULT_ZK_APPID_NODE_SPLIT_INDEX);
appIdNodeSplitIndex = YarnConfiguration.DEFAULT_ZK_APPID_NODE_SPLIT_INDEX;
}
zkAcl = RMZKUtils.getZKAcls(conf); zkAcl = RMZKUtils.getZKAcls(conf);
if (HAUtil.isHAEnabled(conf)) { if (HAUtil.isHAEnabled(conf)) {
@ -269,6 +342,10 @@ public class ZKRMStateStore extends RMStateStore {
verifyActiveStatusThread.start(); verifyActiveStatusThread.start();
} }
create(rmAppRoot); create(rmAppRoot);
create(getNodePath(rmAppRoot, RM_APP_ROOT_HIERARCHIES));
for (int splitIndex = 1; splitIndex <= 4; splitIndex++) {
create(rmAppRootHierarchies.get(splitIndex));
}
create(rmDTSecretManagerRoot); create(rmDTSecretManagerRoot);
create(dtMasterKeysRootPath); create(dtMasterKeysRootPath);
create(delegationTokensRootPath); create(delegationTokensRootPath);
@ -524,42 +601,64 @@ public class ZKRMStateStore extends RMStateStore {
} }
} }
private synchronized void loadRMAppState(RMState rmState) throws Exception { private void loadRMAppStateFromAppNode(RMState rmState, String appNodePath,
List<String> childNodes = getChildren(rmAppRoot); String appIdStr) throws Exception {
byte[] appData = getData(appNodePath);
for (String childNodeName : childNodes) {
String childNodePath = getNodePath(rmAppRoot, childNodeName);
byte[] childData = getData(childNodePath);
if (childNodeName.startsWith(ApplicationId.appIdStrPrefix)) {
// application
if (LOG.isDebugEnabled()) { if (LOG.isDebugEnabled()) {
LOG.debug("Loading application from znode: " + childNodeName); LOG.debug("Loading application from znode: " + appNodePath);
} }
ApplicationId appId = ApplicationId.fromString(appIdStr);
ApplicationId appId = ApplicationId.fromString(childNodeName); ApplicationStateDataPBImpl appState = new ApplicationStateDataPBImpl(
ApplicationStateDataPBImpl appState = ApplicationStateDataProto.parseFrom(appData));
new ApplicationStateDataPBImpl(
ApplicationStateDataProto.parseFrom(childData));
if (!appId.equals( if (!appId.equals(
appState.getApplicationSubmissionContext().getApplicationId())) { appState.getApplicationSubmissionContext().getApplicationId())) {
throw new YarnRuntimeException("The child node name is different " throw new YarnRuntimeException("The node name is different from the " +
+ "from the application id"); "application id");
}
rmState.appState.put(appId, appState);
loadApplicationAttemptState(appState, appNodePath);
} }
rmState.appState.put(appId, appState); private synchronized void loadRMAppState(RMState rmState) throws Exception {
loadApplicationAttemptState(appState, appId); for (int splitIndex = 0; splitIndex <= 4; splitIndex++) {
String appRoot = rmAppRootHierarchies.get(splitIndex);
if (appRoot == null) {
continue;
}
List<String> childNodes = getChildren(appRoot);
boolean appNodeFound = false;
for (String childNodeName : childNodes) {
if (childNodeName.startsWith(ApplicationId.appIdStrPrefix)) {
appNodeFound = true;
if (splitIndex == 0) {
loadRMAppStateFromAppNode(rmState,
getNodePath(appRoot, childNodeName), childNodeName);
} else {
// If AppId Node is partitioned.
String parentNodePath = getNodePath(appRoot, childNodeName);
List<String> leafNodes = getChildren(parentNodePath);
for (String leafNodeName : leafNodes) {
String appIdStr = childNodeName + leafNodeName;
loadRMAppStateFromAppNode(rmState,
getNodePath(parentNodePath, leafNodeName), appIdStr);
}
}
} else { } else {
LOG.info("Unknown child node with name: " + childNodeName); LOG.info("Unknown child node with name: " + childNodeName);
} }
} }
if (splitIndex != appIdNodeSplitIndex && !appNodeFound) {
// If no loaded app exists for a particular split index and the split
// index for which apps are being loaded is not the one configured, then
// we do not need to keep track of this hierarchy for storing/updating/
// removing app/app attempt znodes.
rmAppRootHierarchies.remove(splitIndex);
}
}
} }
private void loadApplicationAttemptState(ApplicationStateData appState, private void loadApplicationAttemptState(ApplicationStateData appState,
ApplicationId appId) String appPath) throws Exception {
throws Exception {
String appPath = getNodePath(rmAppRoot, appId.toString());
List<String> attempts = getChildren(appPath); List<String> attempts = getChildren(appPath);
for (String attemptIDStr : attempts) { for (String attemptIDStr : attempts) {
@ -574,14 +673,68 @@ public class ZKRMStateStore extends RMStateStore {
appState.attempts.put(attemptState.getAttemptId(), attemptState); appState.attempts.put(attemptState.getAttemptId(), attemptState);
} }
} }
LOG.debug("Done loading applications from ZK state store"); LOG.debug("Done loading applications from ZK state store");
} }
/**
* Get parent app node path based on full path and split index supplied.
* @param appIdPath App id path for which parent needs to be returned.
* @param splitIndex split index.
* @return parent app node path.
*/
private String getSplitAppNodeParent(String appIdPath, int splitIndex) {
// Calculated as string upto index (appIdPath Length - split index - 1). We
// deduct 1 to exclude path separator.
return appIdPath.substring(0, appIdPath.length() - splitIndex - 1);
}
/**
* Checks if parent app node has no leaf nodes and if it does not have,
* removes it. Called while removing application.
* @param appIdPath path of app id to be removed.
* @param splitIndex split index.
* @throws Exception if any problem occurs while performing ZK operation.
*/
private void checkRemoveParentAppNode(String appIdPath, int splitIndex)
throws Exception {
if (splitIndex != 0) {
String parentAppNode = getSplitAppNodeParent(appIdPath, splitIndex);
List<String> children = null;
try {
children = getChildren(parentAppNode);
} catch (KeeperException.NoNodeException ke) {
// It should be fine to swallow this exception as the parent app node we
// intend to delete is already deleted.
if (LOG.isDebugEnabled()) {
LOG.debug("Unable to remove app parent node " + parentAppNode +
" as it does not exist.");
}
return;
}
// No apps stored under parent path.
if (children != null && children.isEmpty()) {
try {
safeDelete(parentAppNode);
if (LOG.isDebugEnabled()) {
LOG.debug("No leaf app node exists. Removing parent node " +
parentAppNode);
}
} catch (KeeperException.NotEmptyException ke) {
// It should be fine to swallow this exception as the parent app node
// has to be deleted only if it has no children. And this node has.
if (LOG.isDebugEnabled()) {
LOG.debug("Unable to remove app parent node " + parentAppNode +
" as it has children.");
}
}
}
}
}
@Override @Override
public synchronized void storeApplicationStateInternal(ApplicationId appId, public synchronized void storeApplicationStateInternal(ApplicationId appId,
ApplicationStateData appStateDataPB) throws Exception { ApplicationStateData appStateDataPB) throws Exception {
String nodeCreatePath = getNodePath(rmAppRoot, appId.toString()); String nodeCreatePath = getLeafAppIdNodePath(appId.toString(), true);
if (LOG.isDebugEnabled()) { if (LOG.isDebugEnabled()) {
LOG.debug("Storing info for app: " + appId + " at: " + nodeCreatePath); LOG.debug("Storing info for app: " + appId + " at: " + nodeCreatePath);
@ -596,7 +749,26 @@ public class ZKRMStateStore extends RMStateStore {
protected synchronized void updateApplicationStateInternal( protected synchronized void updateApplicationStateInternal(
ApplicationId appId, ApplicationStateData appStateDataPB) ApplicationId appId, ApplicationStateData appStateDataPB)
throws Exception { throws Exception {
String nodeUpdatePath = getNodePath(rmAppRoot, appId.toString()); String nodeUpdatePath = getLeafAppIdNodePath(appId.toString(), false);
boolean pathExists = true;
// Look for paths based on other split indices if path as per split index
// does not exist.
if (!exists(nodeUpdatePath)) {
AppNodeSplitInfo alternatePathInfo = getAlternatePath(appId.toString());
if (alternatePathInfo != null) {
nodeUpdatePath = alternatePathInfo.path;
} else {
// No alternate path exists. Create path as per configured split index.
pathExists = false;
if (appIdNodeSplitIndex != 0) {
String rootNode =
getSplitAppNodeParent(nodeUpdatePath, appIdNodeSplitIndex);
if (!exists(rootNode)) {
safeCreate(rootNode, null, zkAcl, CreateMode.PERSISTENT);
}
}
}
}
if (LOG.isDebugEnabled()) { if (LOG.isDebugEnabled()) {
LOG.debug("Storing final state info for app: " + appId + " at: " LOG.debug("Storing final state info for app: " + appId + " at: "
@ -605,34 +777,79 @@ public class ZKRMStateStore extends RMStateStore {
byte[] appStateData = appStateDataPB.getProto().toByteArray(); byte[] appStateData = appStateDataPB.getProto().toByteArray();
if (exists(nodeUpdatePath)) { if (pathExists) {
safeSetData(nodeUpdatePath, appStateData, -1); safeSetData(nodeUpdatePath, appStateData, -1);
} else { } else {
safeCreate(nodeUpdatePath, appStateData, zkAcl, safeCreate(nodeUpdatePath, appStateData, zkAcl, CreateMode.PERSISTENT);
CreateMode.PERSISTENT);
if (LOG.isDebugEnabled()) { if (LOG.isDebugEnabled()) {
LOG.debug(appId + " znode didn't exist. Created a new znode to" LOG.debug("Path " + nodeUpdatePath + " for " + appId + " didn't " +
+ " update the application state."); "exist. Creating a new znode to update the application state.");
} }
} }
} }
/*
* Handles store, update and remove application attempt state store
* operations.
*/
private void handleApplicationAttemptStateOp(
ApplicationAttemptId appAttemptId,
ApplicationAttemptStateData attemptStateDataPB, AppAttemptOp operation)
throws Exception {
String appId = appAttemptId.getApplicationId().toString();
String appDirPath = getLeafAppIdNodePath(appId, false);
// Look for paths based on other split indices.
if (!exists(appDirPath)) {
AppNodeSplitInfo alternatePathInfo = getAlternatePath(appId);
if (alternatePathInfo == null) {
if (operation == AppAttemptOp.REMOVE) {
// Unexpected. Assume that app attempt has been deleted.
return;
} else { // Store or Update operation
throw new YarnRuntimeException("Unexpected Exception. App node for " +
"app " + appId + " not found");
}
} else {
appDirPath = alternatePathInfo.path;
}
}
String path = getNodePath(appDirPath, appAttemptId.toString());
byte[] attemptStateData = (attemptStateDataPB == null) ? null :
attemptStateDataPB.getProto().toByteArray();
if (LOG.isDebugEnabled()) {
LOG.debug(operation + " info for attempt: " + appAttemptId + " at: "
+ path);
}
switch (operation) {
case UPDATE:
if (exists(path)) {
safeSetData(path, attemptStateData, -1);
} else {
safeCreate(path, attemptStateData, zkAcl, CreateMode.PERSISTENT);
if (LOG.isDebugEnabled()) {
LOG.debug("Path " + path + " for " + appAttemptId + " didn't exist." +
" Created a new znode to update the application attempt state.");
}
}
break;
case STORE:
safeCreate(path, attemptStateData, zkAcl, CreateMode.PERSISTENT);
break;
case REMOVE:
safeDelete(path);
break;
default:
break;
}
}
@Override @Override
protected synchronized void storeApplicationAttemptStateInternal( protected synchronized void storeApplicationAttemptStateInternal(
ApplicationAttemptId appAttemptId, ApplicationAttemptId appAttemptId,
ApplicationAttemptStateData attemptStateDataPB) ApplicationAttemptStateData attemptStateDataPB)
throws Exception { throws Exception {
String appDirPath = getNodePath(rmAppRoot, handleApplicationAttemptStateOp(appAttemptId, attemptStateDataPB,
appAttemptId.getApplicationId().toString()); AppAttemptOp.STORE);
String nodeCreatePath = getNodePath(appDirPath, appAttemptId.toString());
if (LOG.isDebugEnabled()) {
LOG.debug("Storing info for attempt: " + appAttemptId + " at: "
+ nodeCreatePath);
}
byte[] attemptStateData = attemptStateDataPB.getProto().toByteArray();
safeCreate(nodeCreatePath, attemptStateData, zkAcl, CreateMode.PERSISTENT);
} }
@Override @Override
@ -640,65 +857,73 @@ public class ZKRMStateStore extends RMStateStore {
ApplicationAttemptId appAttemptId, ApplicationAttemptId appAttemptId,
ApplicationAttemptStateData attemptStateDataPB) ApplicationAttemptStateData attemptStateDataPB)
throws Exception { throws Exception {
String appIdStr = appAttemptId.getApplicationId().toString(); handleApplicationAttemptStateOp(appAttemptId, attemptStateDataPB,
String appAttemptIdStr = appAttemptId.toString(); AppAttemptOp.UPDATE);
String appDirPath = getNodePath(rmAppRoot, appIdStr);
String nodeUpdatePath = getNodePath(appDirPath, appAttemptIdStr);
if (LOG.isDebugEnabled()) {
LOG.debug("Storing final state info for attempt: " + appAttemptIdStr
+ " at: " + nodeUpdatePath);
}
byte[] attemptStateData = attemptStateDataPB.getProto().toByteArray();
if (exists(nodeUpdatePath)) {
safeSetData(nodeUpdatePath, attemptStateData, -1);
} else {
safeCreate(nodeUpdatePath, attemptStateData, zkAcl,
CreateMode.PERSISTENT);
if (LOG.isDebugEnabled()) {
LOG.debug(appAttemptId + " znode didn't exist. Created a new znode to"
+ " update the application attempt state.");
}
}
} }
@Override @Override
protected synchronized void removeApplicationAttemptInternal( protected synchronized void removeApplicationAttemptInternal(
ApplicationAttemptId appAttemptId) throws Exception { ApplicationAttemptId appAttemptId) throws Exception {
String appId = appAttemptId.getApplicationId().toString(); handleApplicationAttemptStateOp(appAttemptId, null, AppAttemptOp.REMOVE);
String appIdRemovePath = getNodePath(rmAppRoot, appId);
String attemptIdRemovePath =
getNodePath(appIdRemovePath, appAttemptId.toString());
if (LOG.isDebugEnabled()) {
LOG.debug("Removing info for attempt: " + appAttemptId + " at: "
+ attemptIdRemovePath);
}
safeDelete(attemptIdRemovePath);
} }
@Override @Override
protected synchronized void removeApplicationStateInternal( protected synchronized void removeApplicationStateInternal(
ApplicationStateData appState) throws Exception { ApplicationStateData appState) throws Exception {
String appId = appState.getApplicationSubmissionContext().getApplicationId() removeApp(appState.getApplicationSubmissionContext().
.toString(); getApplicationId().toString(), true, appState.attempts.keySet());
String appIdRemovePath = getNodePath(rmAppRoot, appId);
if (LOG.isDebugEnabled()) {
LOG.debug("Removing info for app: " + appId + " at: " + appIdRemovePath
+ " and its attempts.");
} }
for (ApplicationAttemptId attemptId : appState.attempts.keySet()) { private void removeApp(String removeAppId) throws Exception {
removeApp(removeAppId, false, null);
}
/**
* Remove application node and its attempt nodes.
*
* @param removeAppId Application Id to be removed.
* @param safeRemove Flag indicating if application and attempt nodes have to
* be removed safely under a fencing or not.
* @param attempts list of attempts to be removed associated with this app.
* Ignored if safeRemove flag is false as we recursively delete all the
* child nodes directly.
* @throws Exception if any exception occurs during ZK operation.
*/
private void removeApp(String removeAppId, boolean safeRemove,
Set<ApplicationAttemptId> attempts) throws Exception {
String appIdRemovePath = getLeafAppIdNodePath(removeAppId, false);
int splitIndex = appIdNodeSplitIndex;
// Look for paths based on other split indices if path as per configured
// split index does not exist.
if (!exists(appIdRemovePath)) {
AppNodeSplitInfo alternatePathInfo = getAlternatePath(removeAppId);
if (alternatePathInfo != null) {
appIdRemovePath = alternatePathInfo.path;
splitIndex = alternatePathInfo.splitIndex;
} else {
// Alternate path not found so return.
return;
}
}
if (safeRemove) {
if (LOG.isDebugEnabled()) {
LOG.debug("Removing info for app: " + removeAppId + " at: " +
appIdRemovePath + " and its attempts.");
}
if (attempts != null) {
for (ApplicationAttemptId attemptId : attempts) {
String attemptRemovePath = String attemptRemovePath =
getNodePath(appIdRemovePath, attemptId.toString()); getNodePath(appIdRemovePath, attemptId.toString());
safeDelete(attemptRemovePath); safeDelete(attemptRemovePath);
} }
}
safeDelete(appIdRemovePath); safeDelete(appIdRemovePath);
} else {
curatorFramework.delete().deletingChildrenIfNeeded().
forPath(appIdRemovePath);
}
// Check if we should remove the parent app node as well.
checkRemoveParentAppNode(appIdRemovePath, splitIndex);
} }
@Override @Override
@ -820,8 +1045,7 @@ public class ZKRMStateStore extends RMStateStore {
@Override @Override
public synchronized void removeApplication(ApplicationId removeAppId) public synchronized void removeApplication(ApplicationId removeAppId)
throws Exception { throws Exception {
String appIdRemovePath = getNodePath(rmAppRoot, removeAppId.toString()); removeApp(removeAppId.toString());
delete(appIdRemovePath);
} }
@VisibleForTesting @VisibleForTesting
@ -920,6 +1144,79 @@ public class ZKRMStateStore extends RMStateStore {
} }
} }
/**
* Get alternate path for app id if path according to configured split index
* does not exist. We look for path based on all possible split indices.
* @param appId
* @return a {@link AppNodeSplitInfo} object containing the path and split
* index if it exists, null otherwise.
* @throws Exception if any problem occurs while performing ZK operation.
*/
private AppNodeSplitInfo getAlternatePath(String appId) throws Exception {
for (Map.Entry<Integer, String> entry : rmAppRootHierarchies.entrySet()) {
// Look for other paths
int splitIndex = entry.getKey();
if (splitIndex != appIdNodeSplitIndex) {
String alternatePath =
getLeafAppIdNodePath(appId, entry.getValue(), splitIndex, false);
if (exists(alternatePath)) {
return new AppNodeSplitInfo(alternatePath, splitIndex);
}
}
}
return null;
}
/**
* Returns leaf app node path based on app id and passed split index. If the
* passed flag createParentIfNotExists is true, also creates the parent app
* node if it does not exist.
* @param appId application id.
* @param rootNode app root node based on split index.
* @param appIdNodeSplitIdx split index.
* @param createParentIfNotExists flag which determines if parent app node
* needs to be created(as per split) if it does not exist.
* @return leaf app node path.
* @throws Exception if any problem occurs while performing ZK operation.
*/
private String getLeafAppIdNodePath(String appId, String rootNode,
int appIdNodeSplitIdx, boolean createParentIfNotExists) throws Exception {
if (appIdNodeSplitIdx == 0) {
return getNodePath(rootNode, appId);
}
String nodeName = appId;
int splitIdx = nodeName.length() - appIdNodeSplitIdx;
String rootNodePath =
getNodePath(rootNode, nodeName.substring(0, splitIdx));
if (createParentIfNotExists && !exists(rootNodePath)) {
try {
safeCreate(rootNodePath, null, zkAcl, CreateMode.PERSISTENT);
} catch (KeeperException.NodeExistsException e) {
if (LOG.isDebugEnabled()) {
LOG.debug("Unable to create app parent node " + rootNodePath +
" as it already exists.");
}
}
}
return getNodePath(rootNodePath, nodeName.substring(splitIdx));
}
/**
* Returns leaf app node path based on app id and configured split index. If
* the passed flag createParentIfNotExists is true, also creates the parent
* app node if it does not exist.
* @param appId application id.
* @param createParentIfNotExists flag which determines if parent app node
* needs to be created(as per split) if it does not exist.
* @return leaf app node path.
* @throws Exception if any problem occurs while performing ZK operation.
*/
private String getLeafAppIdNodePath(String appId,
boolean createParentIfNotExists) throws Exception {
return getLeafAppIdNodePath(appId, rmAppRootHierarchies.get(
appIdNodeSplitIndex), appIdNodeSplitIndex, createParentIfNotExists);
}
@VisibleForTesting @VisibleForTesting
byte[] getData(final String path) throws Exception { byte[] getData(final String path) throws Exception {
return curatorFramework.getData().forPath(path); return curatorFramework.getData().forPath(path);
@ -930,11 +1227,13 @@ public class ZKRMStateStore extends RMStateStore {
return curatorFramework.getACL().forPath(path); return curatorFramework.getACL().forPath(path);
} }
private List<String> getChildren(final String path) throws Exception { @VisibleForTesting
List<String> getChildren(final String path) throws Exception {
return curatorFramework.getChildren().forPath(path); return curatorFramework.getChildren().forPath(path);
} }
private boolean exists(final String path) throws Exception { @VisibleForTesting
boolean exists(final String path) throws Exception {
return curatorFramework.checkExists().forPath(path) != null; return curatorFramework.checkExists().forPath(path) != null;
} }
@ -963,6 +1262,11 @@ public class ZKRMStateStore extends RMStateStore {
} }
} }
/**
* Deletes the path. Checks for existence of path as well.
* @param path Path to be deleted.
* @throws Exception if any problem occurs while performing deletion.
*/
private void safeDelete(final String path) throws Exception { private void safeDelete(final String path) throws Exception {
if (exists(path)) { if (exists(path)) {
SafeTransaction transaction = new SafeTransaction(); SafeTransaction transaction = new SafeTransaction();

View File

@ -87,8 +87,13 @@ public class TestRMStoreCommands {
ZKRMStateStore.ROOT_ZNODE_NAME + "/" + RMStateStore.RM_APP_ROOT; ZKRMStateStore.ROOT_ZNODE_NAME + "/" + RMStateStore.RM_APP_ROOT;
String appIdPath = appRootPath + "/" + appId; String appIdPath = appRootPath + "/" + appId;
curatorFramework.create().forPath(appIdPath); curatorFramework.create().forPath(appIdPath);
assertEquals("Application node for " + appId + "should exist", for (String path : curatorFramework.getChildren().forPath(appRootPath)) {
appId, curatorFramework.getChildren().forPath(appRootPath).get(0)); if (path.equals(ZKRMStateStore.RM_APP_ROOT_HIERARCHIES)) {
continue;
}
assertEquals("Application node for " + appId + " should exist",
appId, path);
}
try { try {
ResourceManager.removeApplication(conf, appId); ResourceManager.removeApplication(conf, appId);
} catch (Exception e) { } catch (Exception e) {
@ -96,8 +101,10 @@ public class TestRMStoreCommands {
"rm state store."); "rm state store.");
} }
assertTrue("After remove app from store there should be no child nodes" + assertTrue("After remove app from store there should be no child nodes" +
" in app root path", " for application in app root path",
curatorFramework.getChildren().forPath(appRootPath).isEmpty()); curatorFramework.getChildren().forPath(appRootPath).size() == 1 &&
curatorFramework.getChildren().forPath(appRootPath).get(0).equals(
ZKRMStateStore.RM_APP_ROOT_HIERARCHIES));
} }
} }
} }

View File

@ -156,8 +156,7 @@ public class RMStateStoreTestBase {
} }
protected RMApp storeApp(RMStateStore store, ApplicationId appId, protected RMApp storeApp(RMStateStore store, ApplicationId appId,
long submitTime, long submitTime, long startTime) throws Exception {
long startTime) throws Exception {
ApplicationSubmissionContext context = ApplicationSubmissionContext context =
new ApplicationSubmissionContextPBImpl(); new ApplicationSubmissionContextPBImpl();
context.setApplicationId(appId); context.setApplicationId(appId);
@ -200,6 +199,13 @@ public class RMStateStoreTestBase {
return mockAttempt; return mockAttempt;
} }
protected void updateAttempt(RMStateStore store, TestDispatcher dispatcher,
ApplicationAttemptStateData attemptState) {
dispatcher.attemptId = attemptState.getAttemptId();
store.updateApplicationAttemptState(attemptState);
waitNotify(dispatcher);
}
void testRMAppStateStore(RMStateStoreHelper stateStoreHelper) void testRMAppStateStore(RMStateStoreHelper stateStoreHelper)
throws Exception { throws Exception {
testRMAppStateStore(stateStoreHelper, new StoreStateVerifier()); testRMAppStateStore(stateStoreHelper, new StoreStateVerifier());

View File

@ -28,6 +28,8 @@ import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.ha.HAServiceProtocol; import org.apache.hadoop.ha.HAServiceProtocol;
import org.apache.hadoop.ha.HAServiceProtocol.StateChangeRequestInfo; import org.apache.hadoop.ha.HAServiceProtocol.StateChangeRequestInfo;
import org.apache.hadoop.io.Text; import org.apache.hadoop.io.Text;
import org.apache.hadoop.security.Credentials;
import org.apache.hadoop.security.token.Token;
import org.apache.hadoop.security.token.delegation.DelegationKey; import org.apache.hadoop.security.token.delegation.DelegationKey;
import org.apache.hadoop.service.Service; import org.apache.hadoop.service.Service;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
@ -40,19 +42,25 @@ import org.apache.hadoop.yarn.api.records.impl.pb.ApplicationSubmissionContextPB
import org.apache.hadoop.yarn.api.records.impl.pb.ContainerPBImpl; import org.apache.hadoop.yarn.api.records.impl.pb.ContainerPBImpl;
import org.apache.hadoop.yarn.conf.HAUtil; import org.apache.hadoop.yarn.conf.HAUtil;
import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.security.AMRMTokenIdentifier;
import org.apache.hadoop.yarn.security.client.RMDelegationTokenIdentifier; import org.apache.hadoop.yarn.security.client.RMDelegationTokenIdentifier;
import org.apache.hadoop.yarn.server.records.Version; import org.apache.hadoop.yarn.server.records.Version;
import org.apache.hadoop.yarn.server.records.impl.pb.VersionPBImpl; import org.apache.hadoop.yarn.server.records.impl.pb.VersionPBImpl;
import org.apache.hadoop.yarn.server.resourcemanager.MockRM; import org.apache.hadoop.yarn.server.resourcemanager.MockRM;
import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager; import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager;
import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore.RMState;
import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.ApplicationAttemptStateData; import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.ApplicationAttemptStateData;
import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.ApplicationStateData; import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.ApplicationStateData;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppState;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.AggregateAppResourceUsage; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.AggregateAppResourceUsage;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptMetrics; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptMetrics;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptState; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptState;
import org.apache.hadoop.yarn.server.resourcemanager.security.AMRMTokenSecretManager;
import org.apache.hadoop.yarn.server.resourcemanager.security.ClientToAMTokenSecretManagerInRM; import org.apache.hadoop.yarn.server.resourcemanager.security.ClientToAMTokenSecretManagerInRM;
import org.apache.hadoop.yarn.server.security.MasterKeyData;
import org.apache.zookeeper.KeeperException; import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.ZooDefs.Perms; import org.apache.zookeeper.ZooDefs.Perms;
import org.apache.zookeeper.data.ACL; import org.apache.zookeeper.data.ACL;
@ -61,13 +69,22 @@ import org.junit.Assert;
import org.junit.Before; import org.junit.Before;
import org.junit.Test; import org.junit.Test;
import com.google.common.base.Joiner;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Lists;
import static org.junit.Assert.assertArrayEquals;
import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertTrue; import static org.junit.Assert.assertTrue;
import static org.mockito.Mockito.mock; import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.when; import static org.mockito.Mockito.when;
import java.io.IOException; import java.io.IOException;
import java.util.HashMap;
import java.util.List; import java.util.List;
import java.util.Map;
import javax.crypto.SecretKey; import javax.crypto.SecretKey;
@ -131,9 +148,21 @@ public class TestZKRMStateStore extends RMStateStoreTestBase {
return CURRENT_VERSION_INFO; return CURRENT_VERSION_INFO;
} }
private String getAppNode(String appId, int splitIdx) {
String rootPath = workingZnode + "/" + ROOT_ZNODE_NAME + "/" +
RM_APP_ROOT;
String appPath = appId;
if (splitIdx != 0) {
int idx = appId.length() - splitIdx;
appPath = appId.substring(0, idx) + "/" + appId.substring(idx);
return rootPath + "/" + RM_APP_ROOT_HIERARCHIES + "/" +
Integer.toString(splitIdx) + "/" + appPath;
}
return rootPath + "/" + appPath;
}
public String getAppNode(String appId) { public String getAppNode(String appId) {
return workingZnode + "/" + ROOT_ZNODE_NAME + "/" + RM_APP_ROOT + "/" return getAppNode(appId, 0);
+ appId;
} }
public String getAttemptNode(String appId, String attemptId) { public String getAttemptNode(String appId, String attemptId) {
@ -150,8 +179,7 @@ public class TestZKRMStateStore extends RMStateStoreTestBase {
} }
public RMStateStore getRMStateStore() throws Exception { private RMStateStore createStore(Configuration conf) throws Exception {
YarnConfiguration conf = new YarnConfiguration();
workingZnode = "/jira/issue/3077/rmstore"; workingZnode = "/jira/issue/3077/rmstore";
conf.set(YarnConfiguration.RM_ZK_ADDRESS, conf.set(YarnConfiguration.RM_ZK_ADDRESS,
curatorTestingServer.getConnectString()); curatorTestingServer.getConnectString());
@ -160,6 +188,15 @@ public class TestZKRMStateStore extends RMStateStoreTestBase {
return this.store; return this.store;
} }
public RMStateStore getRMStateStore(Configuration conf) throws Exception {
return createStore(conf);
}
public RMStateStore getRMStateStore() throws Exception {
YarnConfiguration conf = new YarnConfiguration();
return createStore(conf);
}
@Override @Override
public boolean isFinalStateValid() throws Exception { public boolean isFinalStateValid() throws Exception {
return 1 == return 1 ==
@ -179,8 +216,12 @@ public class TestZKRMStateStore extends RMStateStoreTestBase {
} }
public boolean appExists(RMApp app) throws Exception { public boolean appExists(RMApp app) throws Exception {
String appIdPath = app.getApplicationId().toString();
int split =
store.getConfig().getInt(YarnConfiguration.ZK_APPID_NODE_SPLIT_INDEX,
YarnConfiguration.DEFAULT_ZK_APPID_NODE_SPLIT_INDEX);
return null != curatorFramework.checkExists() return null != curatorFramework.checkExists()
.forPath(store.getAppNode(app.getApplicationId().toString())); .forPath(store.getAppNode(appIdPath, split));
} }
public boolean attemptExists(RMAppAttempt attempt) throws Exception { public boolean attemptExists(RMAppAttempt attempt) throws Exception {
@ -343,7 +384,6 @@ public class TestZKRMStateStore extends RMStateStoreTestBase {
rm.close(); rm.close();
} }
@SuppressWarnings("unchecked")
@Test @Test
public void testFencing() throws Exception { public void testFencing() throws Exception {
StateChangeRequestInfo req = new StateChangeRequestInfo( StateChangeRequestInfo req = new StateChangeRequestInfo(
@ -383,6 +423,8 @@ public class TestZKRMStateStore extends RMStateStoreTestBase {
assertEquals("RM should be Active", assertEquals("RM should be Active",
HAServiceProtocol.HAServiceState.ACTIVE, HAServiceProtocol.HAServiceState.ACTIVE,
rm2.getRMContext().getRMAdminService().getServiceStatus().getState()); rm2.getRMContext().getRMAdminService().getServiceStatus().getState());
rm1.close();
rm2.close();
} }
@Test @Test
@ -528,4 +570,518 @@ public class TestZKRMStateStore extends RMStateStoreTestBase {
} }
store.close(); store.close();
} }
private static String createPath(String... parts) {
return Joiner.on("/").join(parts);
}
private static Configuration createConfForAppNodeSplit(int splitIndex) {
YarnConfiguration conf = new YarnConfiguration();
conf.setInt(YarnConfiguration.ZK_APPID_NODE_SPLIT_INDEX, splitIndex);
return conf;
}
private static RMApp createMockAppForRemove(ApplicationId appId,
ApplicationAttemptId... attemptIds) {
RMApp app = mock(RMApp.class);
ApplicationSubmissionContextPBImpl context =
new ApplicationSubmissionContextPBImpl();
context.setApplicationId(appId);
when(app.getApplicationSubmissionContext()).thenReturn(context);
when(app.getUser()).thenReturn("test");
if (attemptIds.length > 0) {
HashMap<ApplicationAttemptId, RMAppAttempt> attempts = new HashMap<>();
for (ApplicationAttemptId attemptId : attemptIds) {
RMAppAttempt appAttempt = mock(RMAppAttempt.class);
when(appAttempt.getAppAttemptId()).thenReturn(attemptId);
attempts.put(attemptId, appAttempt);
}
when(app.getAppAttempts()).thenReturn(attempts);
}
return app;
}
private static void verifyLoadedApp(ApplicationStateData appState,
ApplicationId appId, String user, long submitTime, long startTime,
RMAppState state, long finishTime, String diagnostics) {
// Check if app is loaded correctly
assertNotNull("App " + appId + " should have been loaded.", appState);
assertEquals("App submit time in app state", submitTime,
appState.getSubmitTime());
assertEquals("App start time in app state", startTime,
appState.getStartTime());
assertEquals("App ID in app state", appId,
appState.getApplicationSubmissionContext().getApplicationId());
assertEquals("App state", state, appState.getState());
assertEquals("Finish time in app state", finishTime,
appState.getFinishTime());
assertEquals("User in app state", user, appState.getUser());
assertEquals("Diagnostics in app state", diagnostics,
appState.getDiagnostics());
}
private static void verifyLoadedApp(RMState rmState,
ApplicationId appId, long submitTime, long startTime, long finishTime,
boolean isFinished, List<ApplicationAttemptId> attempts) {
verifyLoadedApp(rmState, appId, submitTime, startTime, finishTime,
isFinished, attempts, null, null);
}
private static void verifyLoadedApp(RMState rmState,
ApplicationId appId, long submitTime, long startTime, long finishTime,
boolean isFinished, List<ApplicationAttemptId> attempts,
List<Integer> amExitStatuses,
List<FinalApplicationStatus> finalStatuses) {
Map<ApplicationId, ApplicationStateData> rmAppState =
rmState.getApplicationState();
ApplicationStateData appState = rmAppState.get(appId);
assertNotNull(appId + " is not there in loaded apps", appState);
verifyLoadedApp(appState, appId, "test", submitTime, startTime,
isFinished ? RMAppState.FINISHED : null, finishTime,
isFinished ? "appDiagnostics" : "");
// Check attempt state.
if (attempts != null) {
assertEquals("Attempts loaded for app " + appId, attempts.size(),
appState.attempts.size());
if (finalStatuses != null && amExitStatuses != null) {
for (int i = 0; i < attempts.size(); i++) {
if (finalStatuses.get(i) != null) {
verifyLoadedAttempt(appState, attempts.get(i),
amExitStatuses.get(i), true);
} else {
verifyLoadedAttempt(appState, attempts.get(i),
amExitStatuses.get(i), false);
}
}
}
} else {
assertEquals(
"Attempts loaded for app " + appId, 0, appState.attempts.size());
}
}
private static void verifyLoadedAttempt(ApplicationStateData appState,
ApplicationAttemptId attemptId, int amExitStatus, boolean isFinished) {
verifyLoadedAttempt(appState, attemptId, isFinished ? "myTrackingUrl" :
"N/A", ContainerId.newContainerId(attemptId, 1), null,
isFinished ? RMAppAttemptState.FINISHED : null, isFinished ?
"attemptDiagnostics" : "", 0, amExitStatus,
isFinished ? FinalApplicationStatus.SUCCEEDED : null);
}
private static void verifyLoadedAttempt(ApplicationStateData appState,
ApplicationAttemptId attemptId, String trackingURL,
ContainerId masterContainerId, SecretKey clientTokenKey,
RMAppAttemptState state, String diagnostics, long finishTime,
int amExitStatus, FinalApplicationStatus finalStatus) {
ApplicationAttemptStateData attemptState = appState.getAttempt(attemptId);
// Check if attempt is loaded correctly
assertNotNull(
"Attempt " + attemptId + " should have been loaded.", attemptState);
assertEquals("Attempt Id in attempt state",
attemptId, attemptState.getAttemptId());
assertEquals("Master Container Id in attempt state",
masterContainerId, attemptState.getMasterContainer().getId());
if (null != clientTokenKey) {
assertArrayEquals("Client token key in attempt state",
clientTokenKey.getEncoded(), attemptState.getAppAttemptTokens().
getSecretKey(RMStateStore.AM_CLIENT_TOKEN_MASTER_KEY_NAME));
}
assertEquals("Attempt state", state, attemptState.getState());
assertEquals("Finish time in attempt state", finishTime,
attemptState.getFinishTime());
assertEquals("Diagnostics in attempt state", diagnostics,
attemptState.getDiagnostics());
assertEquals("AM Container exit status in attempt state", amExitStatus,
attemptState.getAMContainerExitStatus());
assertEquals("Final app status in attempt state", finalStatus,
attemptState.getFinalApplicationStatus());
assertEquals("Tracking URL in attempt state", trackingURL,
attemptState.getFinalTrackingUrl());
}
private static ApplicationStateData createAppState(
ApplicationSubmissionContext ctxt, long submitTime, long startTime,
long finishTime, boolean isFinished) {
return ApplicationStateData.newInstance(submitTime, startTime, "test",
ctxt, isFinished ? RMAppState.FINISHED : null, isFinished ?
"appDiagnostics" : "", isFinished ? finishTime : 0, null);
}
private static ApplicationAttemptStateData createFinishedAttempt(
ApplicationAttemptId attemptId, Container container, long startTime,
int amExitStatus) {
return ApplicationAttemptStateData.newInstance(attemptId,
container, null, startTime, RMAppAttemptState.FINISHED,
"myTrackingUrl", "attemptDiagnostics", FinalApplicationStatus.SUCCEEDED,
amExitStatus, 0, 0, 0, 0, 0);
}
private ApplicationAttemptId storeAttempt(RMStateStore store,
TestDispatcher dispatcher, String appAttemptIdStr,
AMRMTokenSecretManager appTokenMgr,
ClientToAMTokenSecretManagerInRM clientToAMTokenMgr,
boolean createContainer) throws Exception {
ApplicationAttemptId attemptId =
ApplicationAttemptId.fromString(appAttemptIdStr);
Token<AMRMTokenIdentifier> appAttemptToken = null;
if (appTokenMgr != null) {
appAttemptToken = generateAMRMToken(attemptId, appTokenMgr);
}
SecretKey clientTokenKey = null;
if (clientToAMTokenMgr != null) {
clientTokenKey = clientToAMTokenMgr.createMasterKey(attemptId);
Credentials attemptCred = new Credentials();
attemptCred.addSecretKey(RMStateStore.AM_CLIENT_TOKEN_MASTER_KEY_NAME,
clientTokenKey.getEncoded());
}
ContainerId containerId = null;
if (createContainer) {
containerId = ContainerId.newContainerId(attemptId, 1);
}
storeAttempt(store, attemptId, containerId.toString(), appAttemptToken,
clientTokenKey, dispatcher);
return attemptId;
}
private void finishAppWithAttempts(RMState state, RMStateStore store,
TestDispatcher dispatcher, ApplicationAttemptId attemptId,
long submitTime, long startTime, int amExitStatus, long finishTime,
boolean createNewApp) throws Exception {
ApplicationId appId = attemptId.getApplicationId();
ApplicationStateData appStateNew = null;
if (createNewApp) {
ApplicationSubmissionContext context =
new ApplicationSubmissionContextPBImpl();
context.setApplicationId(appId);
appStateNew = createAppState(context, submitTime, startTime, finishTime,
true);
} else {
ApplicationStateData appState = state.getApplicationState().get(appId);
appStateNew = createAppState(appState.getApplicationSubmissionContext(),
submitTime, startTime, finishTime, true);
appStateNew.attempts.putAll(appState.attempts);
}
store.updateApplicationState(appStateNew);
waitNotify(dispatcher);
Container container = new ContainerPBImpl();
container.setId(ContainerId.newContainerId(attemptId, 1));
ApplicationAttemptStateData newAttemptState =
createFinishedAttempt(attemptId, container, startTime, amExitStatus);
updateAttempt(store, dispatcher, newAttemptState);
}
private void storeAppWithAttempts(RMStateStore store,
TestDispatcher dispatcher, ApplicationAttemptId attemptId,
long submitTime, long startTime) throws Exception {
storeAppWithAttempts(store, dispatcher, submitTime, startTime, null, null,
attemptId);
}
private void storeApp(RMStateStore store, TestDispatcher dispatcher,
ApplicationId appId, long submitTime, long startTime) throws Exception {
storeApp(store, appId, submitTime, startTime);
waitNotify(dispatcher);
}
private void storeAppWithAttempts(RMStateStore store,
TestDispatcher dispatcher, long submitTime, long startTime,
AMRMTokenSecretManager appTokenMgr,
ClientToAMTokenSecretManagerInRM clientToAMTokenMgr,
ApplicationAttemptId attemptId, ApplicationAttemptId... attemptIds)
throws Exception {
ApplicationId appId = attemptId.getApplicationId();
storeApp(store, dispatcher, appId, submitTime, startTime);
storeAttempt(store, dispatcher, attemptId.toString(), appTokenMgr,
clientToAMTokenMgr, true);
for (ApplicationAttemptId attempt : attemptIds) {
storeAttempt(store, dispatcher, attempt.toString(), appTokenMgr,
clientToAMTokenMgr, true);
}
}
private static void removeApps(RMStateStore store,
Map<ApplicationId, ApplicationAttemptId[]> appWithAttempts) {
for (Map.Entry<ApplicationId, ApplicationAttemptId[]> entry :
appWithAttempts.entrySet()) {
RMApp mockApp = createMockAppForRemove(entry.getKey(), entry.getValue());
store.removeApplication(mockApp);
}
}
private static void verifyAppPathPath(RMStateStore store, ApplicationId appId,
int splitIndex) throws Exception {
String appIdStr = appId.toString();
String appParent = appIdStr.substring(0, appIdStr.length() - splitIndex);
String appPath = appIdStr.substring(appIdStr.length() - splitIndex);
String path = createPath(((ZKRMStateStore)store).znodeWorkingPath,
ZKRMStateStore.ROOT_ZNODE_NAME, ZKRMStateStore.RM_APP_ROOT,
ZKRMStateStore.RM_APP_ROOT_HIERARCHIES, String.valueOf(splitIndex),
appParent, appPath);
assertTrue("Application with id " + appIdStr + " does not exist as per " +
"split in state store.", ((ZKRMStateStore)store).exists(path));
}
private static void verifyAppInHierarchicalPath(RMStateStore store,
String appId, int splitIdx) throws Exception {
String path = createPath(((ZKRMStateStore)store).znodeWorkingPath,
ZKRMStateStore.ROOT_ZNODE_NAME, ZKRMStateStore.RM_APP_ROOT);
if (splitIdx != 0) {
path = createPath(path, ZKRMStateStore.RM_APP_ROOT_HIERARCHIES,
String.valueOf(splitIdx), appId.substring(0, appId.length() -
splitIdx), appId.substring(appId.length() - splitIdx));
} else {
path = createPath(path, appId);
}
assertTrue(appId + " should exist in path " + path,
((ZKRMStateStore)store).exists(createPath(path)));
}
private static void assertHierarchicalPaths(RMStateStore store,
Map<Integer, Integer> pathToApps) throws Exception {
for (Map.Entry<Integer, Integer> entry : pathToApps.entrySet()) {
String path = createPath(((ZKRMStateStore)store).znodeWorkingPath,
ZKRMStateStore.ROOT_ZNODE_NAME, ZKRMStateStore.RM_APP_ROOT);
if (entry.getKey() != 0) {
path = createPath(path, ZKRMStateStore.RM_APP_ROOT_HIERARCHIES,
String.valueOf(entry.getKey()));
}
assertEquals("Number of childrens for path " + path,
(int) entry.getValue(),
((ZKRMStateStore)store).getChildren(path).size());
}
}
// Test to verify storing of apps and app attempts in ZK state store with app
// node split index configured more than 0.
@Test
public void testAppNodeSplit() throws Exception {
TestZKRMStateStoreTester zkTester = new TestZKRMStateStoreTester();
long submitTime = System.currentTimeMillis();
long startTime = submitTime + 1234;
Configuration conf = new YarnConfiguration();
// Get store with app node split config set as 1.
RMStateStore store = zkTester.getRMStateStore(createConfForAppNodeSplit(1));
TestDispatcher dispatcher = new TestDispatcher();
store.setRMDispatcher(dispatcher);
// Create RM Context and app token manager.
RMContext rmContext = mock(RMContext.class);
when(rmContext.getStateStore()).thenReturn(store);
AMRMTokenSecretManager appTokenMgr =
spy(new AMRMTokenSecretManager(conf, rmContext));
MasterKeyData masterKeyData = appTokenMgr.createNewMasterKey();
when(appTokenMgr.getMasterKey()).thenReturn(masterKeyData);
ClientToAMTokenSecretManagerInRM clientToAMTokenMgr =
new ClientToAMTokenSecretManagerInRM();
// Store app1.
ApplicationId appId1 = ApplicationId.newInstance(1352994193343L, 1);
ApplicationAttemptId attemptId1 =
ApplicationAttemptId.newInstance(appId1, 1);
ApplicationAttemptId attemptId2 =
ApplicationAttemptId.newInstance(appId1, 2);
storeAppWithAttempts(store, dispatcher, submitTime, startTime,
appTokenMgr, clientToAMTokenMgr, attemptId1, attemptId2);
// Store app2 with app id application_1352994193343_120213.
ApplicationId appId21 = ApplicationId.newInstance(1352994193343L, 120213);
storeApp(store, appId21, submitTime, startTime);
waitNotify(dispatcher);
// Store another app which will be removed.
ApplicationId appIdRemoved = ApplicationId.newInstance(1352994193343L, 2);
ApplicationAttemptId attemptIdRemoved =
ApplicationAttemptId.newInstance(appIdRemoved, 1);
storeAppWithAttempts(store, dispatcher, submitTime, startTime,
null, null, attemptIdRemoved);
// Remove the app.
RMApp mockRemovedApp =
createMockAppForRemove(appIdRemoved, attemptIdRemoved);
store.removeApplication(mockRemovedApp);
// Close state store
store.close();
// Load state store
store = zkTester.getRMStateStore(createConfForAppNodeSplit(1));
store.setRMDispatcher(dispatcher);
RMState state = store.loadState();
// Check if application_1352994193343_120213 (i.e. app2) exists in state
// store as per split index.
verifyAppPathPath(store, appId21, 1);
// Verify loaded apps and attempts based on the operations we did before
// reloading the state store.
verifyLoadedApp(state, appId1, submitTime, startTime, 0, false,
Lists.newArrayList(attemptId1, attemptId2), Lists.newArrayList(-1000,
-1000), Lists.newArrayList((FinalApplicationStatus) null, null));
// Update app state for app1.
finishAppWithAttempts(state, store, dispatcher, attemptId2, submitTime,
startTime, 100, 1234, false);
// Test updating app/attempt for app whose initial state is not saved
ApplicationId dummyAppId = ApplicationId.newInstance(1234, 10);
ApplicationAttemptId dummyAttemptId =
ApplicationAttemptId.newInstance(dummyAppId, 6);
finishAppWithAttempts(state, store, dispatcher, dummyAttemptId, submitTime,
startTime, 111, 1234, true);
// Close the store
store.close();
// Check updated application state.
store = zkTester.getRMStateStore(createConfForAppNodeSplit(1));
store.setRMDispatcher(dispatcher);
RMState newRMState = store.loadState();
verifyLoadedApp(newRMState, dummyAppId, submitTime, startTime, 1234, true,
Lists.newArrayList(dummyAttemptId), Lists.newArrayList(111),
Lists.newArrayList(FinalApplicationStatus.SUCCEEDED));
verifyLoadedApp(newRMState, appId1, submitTime, startTime, 1234, true,
Lists.newArrayList(attemptId1, attemptId2),
Lists.newArrayList(-1000, 100), Lists.newArrayList(null,
FinalApplicationStatus.SUCCEEDED));
// assert store is in expected state after everything is cleaned
assertTrue("Store is not in expected state", zkTester.isFinalStateValid());
store.close();
}
// Test to verify storing of apps and app attempts in ZK state store with app
// node split index config changing across restarts.
@Test
public void testAppNodeSplitChangeAcrossRestarts() throws Exception {
TestZKRMStateStoreTester zkTester = new TestZKRMStateStoreTester();
long submitTime = System.currentTimeMillis();
long startTime = submitTime + 1234;
Configuration conf = new YarnConfiguration();
// Create store with app node split set as 1.
RMStateStore store = zkTester.getRMStateStore(createConfForAppNodeSplit(1));
TestDispatcher dispatcher = new TestDispatcher();
store.setRMDispatcher(dispatcher);
RMContext rmContext = mock(RMContext.class);
when(rmContext.getStateStore()).thenReturn(store);
AMRMTokenSecretManager appTokenMgr =
spy(new AMRMTokenSecretManager(conf, rmContext));
MasterKeyData masterKeyData = appTokenMgr.createNewMasterKey();
when(appTokenMgr.getMasterKey()).thenReturn(masterKeyData);
ClientToAMTokenSecretManagerInRM clientToAMTokenMgr =
new ClientToAMTokenSecretManagerInRM();
// Store app1 with 2 attempts.
ApplicationId appId1 = ApplicationId.newInstance(1442994194053L, 1);
ApplicationAttemptId attemptId1 =
ApplicationAttemptId.newInstance(appId1, 1);
ApplicationAttemptId attemptId2 =
ApplicationAttemptId.newInstance(appId1, 2);
storeAppWithAttempts(store, dispatcher, submitTime, startTime,
appTokenMgr, clientToAMTokenMgr, attemptId1, attemptId2);
// Store app2 and associated attempt.
ApplicationId appId11 = ApplicationId.newInstance(1442994194053L, 2);
ApplicationAttemptId attemptId11 =
ApplicationAttemptId.newInstance(appId11, 1);
storeAppWithAttempts(store, dispatcher, attemptId11, submitTime, startTime);
// Close state store
store.close();
// Load state store with app node split config of 2.
store = zkTester.getRMStateStore(createConfForAppNodeSplit(2));
store.setRMDispatcher(dispatcher);
RMState state = store.loadState();
ApplicationId appId21 = ApplicationId.newInstance(1442994194053L, 120213);
storeApp(store, dispatcher, appId21, submitTime, startTime);
// Check if app is loaded correctly despite change in split index.
verifyLoadedApp(state, appId1, submitTime, startTime, 0, false,
Lists.newArrayList(attemptId1, attemptId2), Lists.newArrayList(-1000,
-1000), Lists.newArrayList((FinalApplicationStatus) null, null));
// Finish app/attempt state
finishAppWithAttempts(state, store, dispatcher, attemptId2, submitTime,
startTime, 100, 1234, false);
// Test updating app/attempt for app whose initial state is not saved
ApplicationId dummyAppId = ApplicationId.newInstance(1234, 10);
ApplicationAttemptId dummyAttemptId =
ApplicationAttemptId.newInstance(dummyAppId, 6);
finishAppWithAttempts(state, store, dispatcher, dummyAttemptId, submitTime,
startTime, 111, 1234, true);
// Close the store
store.close();
// Load state store this time with split index of 0.
store = zkTester.getRMStateStore(createConfForAppNodeSplit(0));
store.setRMDispatcher(dispatcher);
state = store.loadState();
assertEquals("Number of Apps loaded should be 4.", 4,
state.getApplicationState().size());
verifyLoadedApp(state, appId1, submitTime, startTime, 1234, true,
Lists.newArrayList(attemptId1, attemptId2), Lists.newArrayList(-1000,
100), Lists.newArrayList(null, FinalApplicationStatus.SUCCEEDED));
// Remove attempt1
store.removeApplicationAttempt(attemptId1);
ApplicationId appId31 = ApplicationId.newInstance(1442994195071L, 45);
storeApp(store, dispatcher, appId31, submitTime, startTime);
// Close state store.
store.close();
// Load state store with split index of 3.
store = zkTester.getRMStateStore(createConfForAppNodeSplit(3));
store.setRMDispatcher(dispatcher);
state = store.loadState();
assertEquals("Number of apps loaded should be 5.", 5,
state.getApplicationState().size());
verifyLoadedApp(state, dummyAppId, submitTime, startTime, 1234, true,
Lists.newArrayList(dummyAttemptId), Lists.newArrayList(111),
Lists.newArrayList(FinalApplicationStatus.SUCCEEDED));
verifyLoadedApp(state, appId31, submitTime, startTime, 0, false, null);
verifyLoadedApp(state, appId21, submitTime, startTime, 0, false, null);
verifyLoadedApp(state, appId11, submitTime, startTime, 0, false,
Lists.newArrayList(attemptId11), Lists.newArrayList(-1000),
Lists.newArrayList((FinalApplicationStatus) null));
verifyLoadedApp(state, appId1, submitTime, startTime, 1234, true,
Lists.newArrayList(attemptId2), Lists.newArrayList(100),
Lists.newArrayList(FinalApplicationStatus.SUCCEEDED));
// Store another app.
ApplicationId appId41 = ApplicationId.newInstance(1442994195087L, 1);
storeApp(store, dispatcher, appId41, submitTime, startTime);
// Check how many apps exist in each of the hierarchy based paths. 0 paths
// should exist in "HIERARCHIES/4" path as app split index was never set
// as 4 in tests above.
assertHierarchicalPaths(store, ImmutableMap.of(0, 2, 1, 1, 2, 2,
3, 1, 4, 0));
verifyAppInHierarchicalPath(store, "application_1442994195087_0001", 3);
ApplicationId appId71 = ApplicationId.newInstance(1442994195087L, 7);
//storeApp(store, dispatcher, appId71, submitTime, startTime);
storeApp(store, appId71, submitTime, startTime);
waitNotify(dispatcher);
ApplicationAttemptId attemptId71 =
ApplicationAttemptId.newInstance(appId71, 1);
storeAttempt(store, ApplicationAttemptId.newInstance(appId71, 1),
ContainerId.newContainerId(attemptId71, 1).toString(), null, null,
dispatcher);
// Remove applications.
removeApps(store, ImmutableMap.of(appId11, new ApplicationAttemptId[]
{attemptId11}, appId71, new ApplicationAttemptId[] {attemptId71},
appId41, new ApplicationAttemptId[0], appId31,
new ApplicationAttemptId[0], appId21, new ApplicationAttemptId[0]));
removeApps(store, ImmutableMap.of(dummyAppId,
new ApplicationAttemptId[] {dummyAttemptId}, appId1,
new ApplicationAttemptId[] {attemptId1, attemptId2}));
store.close();
// Load state store with split index of 3 again. As all apps have been
// removed nothing should be loaded back.
store = zkTester.getRMStateStore(createConfForAppNodeSplit(3));
store.setRMDispatcher(dispatcher);
state = store.loadState();
assertEquals("Number of apps loaded should be 0.", 0,
state.getApplicationState().size());
// Close the state store.
store.close();
}
} }