YARN-7262. Add a hierarchy into the ZKRMStateStore for delegation token znodes to prevent jute buffer overflow (rkanter)
(cherry picked from commit b1de78619f
)
This commit is contained in:
parent
cd9078df68
commit
aa65f6c1ad
|
@ -610,6 +610,13 @@ public class YarnConfiguration extends Configuration {
|
||||||
RM_ZK_PREFIX + "appid-node.split-index";
|
RM_ZK_PREFIX + "appid-node.split-index";
|
||||||
public static final int DEFAULT_ZK_APPID_NODE_SPLIT_INDEX = 0;
|
public static final int DEFAULT_ZK_APPID_NODE_SPLIT_INDEX = 0;
|
||||||
|
|
||||||
|
/** Index at which the RM Delegation Token ids will be split so that the
|
||||||
|
* delegation token znodes stored in the zookeeper RM state store will be
|
||||||
|
* stored as two different znodes (parent-child). **/
|
||||||
|
public static final String ZK_DELEGATION_TOKEN_NODE_SPLIT_INDEX =
|
||||||
|
RM_ZK_PREFIX + "delegation-token-node.split-index";
|
||||||
|
public static final int DEFAULT_ZK_DELEGATION_TOKEN_NODE_SPLIT_INDEX = 0;
|
||||||
|
|
||||||
public static final String RM_ZK_ACL = RM_ZK_PREFIX + "acl";
|
public static final String RM_ZK_ACL = RM_ZK_PREFIX + "acl";
|
||||||
public static final String DEFAULT_RM_ZK_ACL = "world:anyone:rwcda";
|
public static final String DEFAULT_RM_ZK_ACL = "world:anyone:rwcda";
|
||||||
|
|
||||||
|
|
|
@ -593,6 +593,24 @@
|
||||||
<value>0</value>
|
<value>0</value>
|
||||||
</property>
|
</property>
|
||||||
|
|
||||||
|
<property>
|
||||||
|
<description>Index at which the RM Delegation Token ids will be split so
|
||||||
|
that the delegation token znodes stored in the zookeeper RM state store
|
||||||
|
will be stored as two different znodes (parent-child). The split is done
|
||||||
|
from the end. For instance, with no split, a delegation token znode will
|
||||||
|
be of the form RMDelegationToken_123456789. If the value of this config is
|
||||||
|
1, the delegation token znode will be broken into two parts:
|
||||||
|
RMDelegationToken_12345678 and 9 respectively with former being the parent
|
||||||
|
node. This config can take values from 0 to 4. 0 means there will be no
|
||||||
|
split. If the value is outside this range, it will be treated as 0 (i.e.
|
||||||
|
no split). A value larger than 0 (up to 4) should be configured if you are
|
||||||
|
running a large number of applications, with long-lived delegation tokens
|
||||||
|
and state store operations (e.g. failover) are failing due to LenError in
|
||||||
|
Zookeeper.</description>
|
||||||
|
<name>yarn.resourcemanager.zk-delegation-token-node.split-index</name>
|
||||||
|
<value>0</value>
|
||||||
|
</property>
|
||||||
|
|
||||||
<property>
|
<property>
|
||||||
<description>Specifies the maximum size of the data that can be stored
|
<description>Specifies the maximum size of the data that can be stored
|
||||||
in a znode. Value should be same or less than jute.maxbuffer configured
|
in a znode. Value should be same or less than jute.maxbuffer configured
|
||||||
|
|
|
@ -89,6 +89,8 @@ public abstract class RMStateStore extends AbstractService {
|
||||||
@VisibleForTesting
|
@VisibleForTesting
|
||||||
public static final String RM_APP_ROOT = "RMAppRoot";
|
public static final String RM_APP_ROOT = "RMAppRoot";
|
||||||
protected static final String RM_DT_SECRET_MANAGER_ROOT = "RMDTSecretManagerRoot";
|
protected static final String RM_DT_SECRET_MANAGER_ROOT = "RMDTSecretManagerRoot";
|
||||||
|
protected static final String RM_DELEGATION_TOKENS_ROOT_ZNODE_NAME =
|
||||||
|
"RMDelegationTokensRoot";
|
||||||
protected static final String DELEGATION_KEY_PREFIX = "DelegationKey_";
|
protected static final String DELEGATION_KEY_PREFIX = "DelegationKey_";
|
||||||
protected static final String DELEGATION_TOKEN_PREFIX = "RMDelegationToken_";
|
protected static final String DELEGATION_TOKEN_PREFIX = "RMDelegationToken_";
|
||||||
protected static final String DELEGATION_TOKEN_SEQUENCE_NUMBER_PREFIX =
|
protected static final String DELEGATION_TOKEN_SEQUENCE_NUMBER_PREFIX =
|
||||||
|
|
|
@ -118,6 +118,22 @@ import java.util.Set;
|
||||||
* |--- RM_DT_SECRET_MANAGER_ROOT
|
* |--- RM_DT_SECRET_MANAGER_ROOT
|
||||||
* |----- RM_DT_SEQUENTIAL_NUMBER_ZNODE_NAME
|
* |----- RM_DT_SEQUENTIAL_NUMBER_ZNODE_NAME
|
||||||
* |----- RM_DELEGATION_TOKENS_ROOT_ZNODE_NAME
|
* |----- RM_DELEGATION_TOKENS_ROOT_ZNODE_NAME
|
||||||
|
* | |----- 1
|
||||||
|
* | | |----- (#TokenId barring last character)
|
||||||
|
* | | | |----- (#Last character of TokenId)
|
||||||
|
* | | ....
|
||||||
|
* | |----- 2
|
||||||
|
* | | |----- (#TokenId barring last 2 characters)
|
||||||
|
* | | | |----- (#Last 2 characters of TokenId)
|
||||||
|
* | | ....
|
||||||
|
* | |----- 3
|
||||||
|
* | | |----- (#TokenId barring last 3 characters)
|
||||||
|
* | | | |----- (#Last 3 characters of TokenId)
|
||||||
|
* | | ....
|
||||||
|
* | |----- 4
|
||||||
|
* | | |----- (#TokenId barring last 4 characters)
|
||||||
|
* | | | |----- (#Last 4 characters of TokenId)
|
||||||
|
* | | ....
|
||||||
* | |----- Token_1
|
* | |----- Token_1
|
||||||
* | |----- Token_2
|
* | |----- Token_2
|
||||||
* | ....
|
* | ....
|
||||||
|
@ -147,6 +163,11 @@ import java.util.Set;
|
||||||
* splitting it in 2 parts, depending on a configurable split index. This limits
|
* splitting it in 2 parts, depending on a configurable split index. This limits
|
||||||
* the number of application znodes returned in a single call while loading
|
* the number of application znodes returned in a single call while loading
|
||||||
* app state.
|
* app state.
|
||||||
|
*
|
||||||
|
* Changes from 1.4 to 1.5 - Change the structure of delegation token znode by
|
||||||
|
* splitting it in 2 parts, depending on a configurable split index. This limits
|
||||||
|
* the number of delegation token znodes returned in a single call while loading
|
||||||
|
* tokens state.
|
||||||
*/
|
*/
|
||||||
@Private
|
@Private
|
||||||
@Unstable
|
@Unstable
|
||||||
|
@ -162,7 +183,7 @@ public class ZKRMStateStore extends RMStateStore {
|
||||||
@VisibleForTesting
|
@VisibleForTesting
|
||||||
public static final String ROOT_ZNODE_NAME = "ZKRMStateRoot";
|
public static final String ROOT_ZNODE_NAME = "ZKRMStateRoot";
|
||||||
protected static final Version CURRENT_VERSION_INFO = Version
|
protected static final Version CURRENT_VERSION_INFO = Version
|
||||||
.newInstance(1, 4);
|
.newInstance(1, 5);
|
||||||
@VisibleForTesting
|
@VisibleForTesting
|
||||||
public static final String RM_APP_ROOT_HIERARCHIES = "HIERARCHIES";
|
public static final String RM_APP_ROOT_HIERARCHIES = "HIERARCHIES";
|
||||||
|
|
||||||
|
@ -170,6 +191,7 @@ public class ZKRMStateStore extends RMStateStore {
|
||||||
private String zkRootNodePath;
|
private String zkRootNodePath;
|
||||||
private String rmAppRoot;
|
private String rmAppRoot;
|
||||||
private Map<Integer, String> rmAppRootHierarchies;
|
private Map<Integer, String> rmAppRootHierarchies;
|
||||||
|
private Map<Integer, String> rmDelegationTokenHierarchies;
|
||||||
private String rmDTSecretManagerRoot;
|
private String rmDTSecretManagerRoot;
|
||||||
private String dtMasterKeysRootPath;
|
private String dtMasterKeysRootPath;
|
||||||
private String delegationTokensRootPath;
|
private String delegationTokensRootPath;
|
||||||
|
@ -180,6 +202,8 @@ public class ZKRMStateStore extends RMStateStore {
|
||||||
@VisibleForTesting
|
@VisibleForTesting
|
||||||
protected String znodeWorkingPath;
|
protected String znodeWorkingPath;
|
||||||
private int appIdNodeSplitIndex = 0;
|
private int appIdNodeSplitIndex = 0;
|
||||||
|
@VisibleForTesting
|
||||||
|
protected int delegationTokenNodeSplitIndex = 0;
|
||||||
|
|
||||||
/* Fencing related variables */
|
/* Fencing related variables */
|
||||||
private static final String FENCING_LOCK = "RM_ZK_FENCING_LOCK";
|
private static final String FENCING_LOCK = "RM_ZK_FENCING_LOCK";
|
||||||
|
@ -212,12 +236,13 @@ public class ZKRMStateStore extends RMStateStore {
|
||||||
};
|
};
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Encapsulates full app node path and corresponding split index.
|
* Encapsulates znode path and corresponding split index for hierarchical
|
||||||
|
* znode layouts.
|
||||||
*/
|
*/
|
||||||
private final static class AppNodeSplitInfo {
|
private final static class ZnodeSplitInfo {
|
||||||
private final String path;
|
private final String path;
|
||||||
private final int splitIndex;
|
private final int splitIndex;
|
||||||
AppNodeSplitInfo(String path, int splitIndex) {
|
ZnodeSplitInfo(String path, int splitIndex) {
|
||||||
this.path = path;
|
this.path = path;
|
||||||
this.splitIndex = splitIndex;
|
this.splitIndex = splitIndex;
|
||||||
}
|
}
|
||||||
|
@ -288,7 +313,7 @@ public class ZKRMStateStore extends RMStateStore {
|
||||||
appIdNodeSplitIndex =
|
appIdNodeSplitIndex =
|
||||||
conf.getInt(YarnConfiguration.ZK_APPID_NODE_SPLIT_INDEX,
|
conf.getInt(YarnConfiguration.ZK_APPID_NODE_SPLIT_INDEX,
|
||||||
YarnConfiguration.DEFAULT_ZK_APPID_NODE_SPLIT_INDEX);
|
YarnConfiguration.DEFAULT_ZK_APPID_NODE_SPLIT_INDEX);
|
||||||
if (appIdNodeSplitIndex < 1 || appIdNodeSplitIndex > 4) {
|
if (appIdNodeSplitIndex < 0 || appIdNodeSplitIndex > 4) {
|
||||||
LOG.info("Invalid value " + appIdNodeSplitIndex + " for config " +
|
LOG.info("Invalid value " + appIdNodeSplitIndex + " for config " +
|
||||||
YarnConfiguration.ZK_APPID_NODE_SPLIT_INDEX + " specified. " +
|
YarnConfiguration.ZK_APPID_NODE_SPLIT_INDEX + " specified. " +
|
||||||
"Resetting it to " +
|
"Resetting it to " +
|
||||||
|
@ -322,12 +347,30 @@ public class ZKRMStateStore extends RMStateStore {
|
||||||
RM_DT_MASTER_KEYS_ROOT_ZNODE_NAME);
|
RM_DT_MASTER_KEYS_ROOT_ZNODE_NAME);
|
||||||
delegationTokensRootPath = getNodePath(rmDTSecretManagerRoot,
|
delegationTokensRootPath = getNodePath(rmDTSecretManagerRoot,
|
||||||
RM_DELEGATION_TOKENS_ROOT_ZNODE_NAME);
|
RM_DELEGATION_TOKENS_ROOT_ZNODE_NAME);
|
||||||
|
rmDelegationTokenHierarchies = new HashMap<>(5);
|
||||||
|
rmDelegationTokenHierarchies.put(0, delegationTokensRootPath);
|
||||||
|
for (int splitIndex = 1; splitIndex <= 4; splitIndex++) {
|
||||||
|
rmDelegationTokenHierarchies.put(splitIndex,
|
||||||
|
getNodePath(delegationTokensRootPath, Integer.toString(splitIndex)));
|
||||||
|
}
|
||||||
dtSequenceNumberPath = getNodePath(rmDTSecretManagerRoot,
|
dtSequenceNumberPath = getNodePath(rmDTSecretManagerRoot,
|
||||||
RM_DT_SEQUENTIAL_NUMBER_ZNODE_NAME);
|
RM_DT_SEQUENTIAL_NUMBER_ZNODE_NAME);
|
||||||
amrmTokenSecretManagerRoot =
|
amrmTokenSecretManagerRoot =
|
||||||
getNodePath(zkRootNodePath, AMRMTOKEN_SECRET_MANAGER_ROOT);
|
getNodePath(zkRootNodePath, AMRMTOKEN_SECRET_MANAGER_ROOT);
|
||||||
reservationRoot = getNodePath(zkRootNodePath, RESERVATION_SYSTEM_ROOT);
|
reservationRoot = getNodePath(zkRootNodePath, RESERVATION_SYSTEM_ROOT);
|
||||||
zkManager = resourceManager.getAndStartZKManager(conf);
|
zkManager = resourceManager.getAndStartZKManager(conf);
|
||||||
|
delegationTokenNodeSplitIndex =
|
||||||
|
conf.getInt(YarnConfiguration.ZK_DELEGATION_TOKEN_NODE_SPLIT_INDEX,
|
||||||
|
YarnConfiguration.DEFAULT_ZK_DELEGATION_TOKEN_NODE_SPLIT_INDEX);
|
||||||
|
if (delegationTokenNodeSplitIndex < 0
|
||||||
|
|| delegationTokenNodeSplitIndex > 4) {
|
||||||
|
LOG.info("Invalid value " + delegationTokenNodeSplitIndex + " for config "
|
||||||
|
+ YarnConfiguration.ZK_DELEGATION_TOKEN_NODE_SPLIT_INDEX
|
||||||
|
+ " specified. Resetting it to " +
|
||||||
|
YarnConfiguration.DEFAULT_ZK_DELEGATION_TOKEN_NODE_SPLIT_INDEX);
|
||||||
|
delegationTokenNodeSplitIndex =
|
||||||
|
YarnConfiguration.DEFAULT_ZK_DELEGATION_TOKEN_NODE_SPLIT_INDEX;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
@ -350,6 +393,9 @@ public class ZKRMStateStore extends RMStateStore {
|
||||||
create(rmDTSecretManagerRoot);
|
create(rmDTSecretManagerRoot);
|
||||||
create(dtMasterKeysRootPath);
|
create(dtMasterKeysRootPath);
|
||||||
create(delegationTokensRootPath);
|
create(delegationTokensRootPath);
|
||||||
|
for (int splitIndex = 1; splitIndex <= 4; splitIndex++) {
|
||||||
|
create(rmDelegationTokenHierarchies.get(splitIndex));
|
||||||
|
}
|
||||||
create(dtSequenceNumberPath);
|
create(dtSequenceNumberPath);
|
||||||
create(amrmTokenSecretManagerRoot);
|
create(amrmTokenSecretManagerRoot);
|
||||||
create(reservationRoot);
|
create(reservationRoot);
|
||||||
|
@ -572,36 +618,63 @@ public class ZKRMStateStore extends RMStateStore {
|
||||||
}
|
}
|
||||||
|
|
||||||
private void loadRMDelegationTokenState(RMState rmState) throws Exception {
|
private void loadRMDelegationTokenState(RMState rmState) throws Exception {
|
||||||
List<String> childNodes =
|
for (int splitIndex = 0; splitIndex <= 4; splitIndex++) {
|
||||||
getChildren(delegationTokensRootPath);
|
String tokenRoot = rmDelegationTokenHierarchies.get(splitIndex);
|
||||||
|
if (tokenRoot == null) {
|
||||||
for (String childNodeName : childNodes) {
|
|
||||||
String childNodePath =
|
|
||||||
getNodePath(delegationTokensRootPath, childNodeName);
|
|
||||||
byte[] childData = getData(childNodePath);
|
|
||||||
|
|
||||||
if (childData == null) {
|
|
||||||
LOG.warn("Content of " + childNodePath + " is broken.");
|
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
|
List<String> childNodes = getChildren(tokenRoot);
|
||||||
ByteArrayInputStream is = new ByteArrayInputStream(childData);
|
boolean dtNodeFound = false;
|
||||||
|
for (String childNodeName : childNodes) {
|
||||||
try (DataInputStream fsIn = new DataInputStream(is)) {
|
|
||||||
if (childNodeName.startsWith(DELEGATION_TOKEN_PREFIX)) {
|
if (childNodeName.startsWith(DELEGATION_TOKEN_PREFIX)) {
|
||||||
RMDelegationTokenIdentifierData identifierData =
|
dtNodeFound = true;
|
||||||
new RMDelegationTokenIdentifierData();
|
String parentNodePath = getNodePath(tokenRoot, childNodeName);
|
||||||
identifierData.readFields(fsIn);
|
if (splitIndex == 0) {
|
||||||
RMDelegationTokenIdentifier identifier =
|
loadDelegationTokenFromNode(rmState, parentNodePath);
|
||||||
identifierData.getTokenIdentifier();
|
} else {
|
||||||
long renewDate = identifierData.getRenewDate();
|
// If znode is partitioned.
|
||||||
rmState.rmSecretManagerState.delegationTokenState.put(identifier,
|
List<String> leafNodes = getChildren(parentNodePath);
|
||||||
renewDate);
|
for (String leafNodeName : leafNodes) {
|
||||||
|
loadDelegationTokenFromNode(rmState,
|
||||||
if (LOG.isDebugEnabled()) {
|
getNodePath(parentNodePath, leafNodeName));
|
||||||
LOG.debug("Loaded RMDelegationTokenIdentifier: " + identifier
|
}
|
||||||
+ " renewDate=" + renewDate);
|
|
||||||
}
|
}
|
||||||
|
} else if (splitIndex == 0
|
||||||
|
&& !(childNodeName.equals("1") || childNodeName.equals("2")
|
||||||
|
|| childNodeName.equals("3") || childNodeName.equals("4"))) {
|
||||||
|
LOG.debug("Unknown child node with name " + childNodeName + " under" +
|
||||||
|
tokenRoot);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if (splitIndex != delegationTokenNodeSplitIndex && !dtNodeFound) {
|
||||||
|
// If no loaded delegation token exists for a particular split index and
|
||||||
|
// the split index for which tokens are being loaded is not the one
|
||||||
|
// configured, then we do not need to keep track of this hierarchy for
|
||||||
|
// storing/updating/removing delegation token znodes.
|
||||||
|
rmDelegationTokenHierarchies.remove(splitIndex);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private void loadDelegationTokenFromNode(RMState rmState, String path)
|
||||||
|
throws Exception {
|
||||||
|
byte[] data = getData(path);
|
||||||
|
if (data == null) {
|
||||||
|
LOG.warn("Content of " + path + " is broken.");
|
||||||
|
} else {
|
||||||
|
ByteArrayInputStream is = new ByteArrayInputStream(data);
|
||||||
|
try (DataInputStream fsIn = new DataInputStream(is)) {
|
||||||
|
RMDelegationTokenIdentifierData identifierData =
|
||||||
|
new RMDelegationTokenIdentifierData();
|
||||||
|
identifierData.readFields(fsIn);
|
||||||
|
RMDelegationTokenIdentifier identifier =
|
||||||
|
identifierData.getTokenIdentifier();
|
||||||
|
long renewDate = identifierData.getRenewDate();
|
||||||
|
rmState.rmSecretManagerState.delegationTokenState.put(identifier,
|
||||||
|
renewDate);
|
||||||
|
if (LOG.isDebugEnabled()) {
|
||||||
|
LOG.debug("Loaded RMDelegationTokenIdentifier: " + identifier
|
||||||
|
+ " renewDate=" + renewDate);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -649,8 +722,9 @@ public class ZKRMStateStore extends RMStateStore {
|
||||||
getNodePath(parentNodePath, leafNodeName), appIdStr);
|
getNodePath(parentNodePath, leafNodeName), appIdStr);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
} else {
|
} else if (!childNodeName.equals(RM_APP_ROOT_HIERARCHIES)){
|
||||||
LOG.info("Unknown child node with name: " + childNodeName);
|
LOG.debug("Unknown child node with name " + childNodeName + " under" +
|
||||||
|
appRoot);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
if (splitIndex != appIdNodeSplitIndex && !appNodeFound) {
|
if (splitIndex != appIdNodeSplitIndex && !appNodeFound) {
|
||||||
|
@ -683,36 +757,36 @@ public class ZKRMStateStore extends RMStateStore {
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Get parent app node path based on full path and split index supplied.
|
* Get znode path based on full path and split index supplied.
|
||||||
* @param appIdPath App id path for which parent needs to be returned.
|
* @param path path for which parent needs to be returned.
|
||||||
* @param splitIndex split index.
|
* @param splitIndex split index.
|
||||||
* @return parent app node path.
|
* @return parent app node path.
|
||||||
*/
|
*/
|
||||||
private String getSplitAppNodeParent(String appIdPath, int splitIndex) {
|
private String getSplitZnodeParent(String path, int splitIndex) {
|
||||||
// Calculated as string upto index (appIdPath Length - split index - 1). We
|
// Calculated as string up to index (path Length - split index - 1). We
|
||||||
// deduct 1 to exclude path separator.
|
// deduct 1 to exclude path separator.
|
||||||
return appIdPath.substring(0, appIdPath.length() - splitIndex - 1);
|
return path.substring(0, path.length() - splitIndex - 1);
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Checks if parent app node has no leaf nodes and if it does not have,
|
* Checks if parent znode has no leaf nodes and if it does not have,
|
||||||
* removes it. Called while removing application.
|
* removes it.
|
||||||
* @param appIdPath path of app id to be removed.
|
* @param path path of znode to be removed.
|
||||||
* @param splitIndex split index.
|
* @param splitIndex split index.
|
||||||
* @throws Exception if any problem occurs while performing ZK operation.
|
* @throws Exception if any problem occurs while performing ZK operation.
|
||||||
*/
|
*/
|
||||||
private void checkRemoveParentAppNode(String appIdPath, int splitIndex)
|
private void checkRemoveParentZnode(String path, int splitIndex)
|
||||||
throws Exception {
|
throws Exception {
|
||||||
if (splitIndex != 0) {
|
if (splitIndex != 0) {
|
||||||
String parentAppNode = getSplitAppNodeParent(appIdPath, splitIndex);
|
String parentZnode = getSplitZnodeParent(path, splitIndex);
|
||||||
List<String> children = null;
|
List<String> children = null;
|
||||||
try {
|
try {
|
||||||
children = getChildren(parentAppNode);
|
children = getChildren(parentZnode);
|
||||||
} catch (KeeperException.NoNodeException ke) {
|
} catch (KeeperException.NoNodeException ke) {
|
||||||
// It should be fine to swallow this exception as the parent app node we
|
// It should be fine to swallow this exception as the parent znode we
|
||||||
// intend to delete is already deleted.
|
// intend to delete is already deleted.
|
||||||
if (LOG.isDebugEnabled()) {
|
if (LOG.isDebugEnabled()) {
|
||||||
LOG.debug("Unable to remove app parent node " + parentAppNode +
|
LOG.debug("Unable to remove parent node " + parentZnode +
|
||||||
" as it does not exist.");
|
" as it does not exist.");
|
||||||
}
|
}
|
||||||
return;
|
return;
|
||||||
|
@ -720,16 +794,16 @@ public class ZKRMStateStore extends RMStateStore {
|
||||||
// No apps stored under parent path.
|
// No apps stored under parent path.
|
||||||
if (children != null && children.isEmpty()) {
|
if (children != null && children.isEmpty()) {
|
||||||
try {
|
try {
|
||||||
zkManager.safeDelete(parentAppNode, zkAcl, fencingNodePath);
|
zkManager.safeDelete(parentZnode, zkAcl, fencingNodePath);
|
||||||
if (LOG.isDebugEnabled()) {
|
if (LOG.isDebugEnabled()) {
|
||||||
LOG.debug("No leaf app node exists. Removing parent node " +
|
LOG.debug("No leaf znode exists. Removing parent node " +
|
||||||
parentAppNode);
|
parentZnode);
|
||||||
}
|
}
|
||||||
} catch (KeeperException.NotEmptyException ke) {
|
} catch (KeeperException.NotEmptyException ke) {
|
||||||
// It should be fine to swallow this exception as the parent app node
|
// It should be fine to swallow this exception as the parent znode
|
||||||
// has to be deleted only if it has no children. And this node has.
|
// has to be deleted only if it has no children. And this node has.
|
||||||
if (LOG.isDebugEnabled()) {
|
if (LOG.isDebugEnabled()) {
|
||||||
LOG.debug("Unable to remove app parent node " + parentAppNode +
|
LOG.debug("Unable to remove app parent node " + parentZnode +
|
||||||
" as it has children.");
|
" as it has children.");
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -770,7 +844,7 @@ public class ZKRMStateStore extends RMStateStore {
|
||||||
// Look for paths based on other split indices if path as per split index
|
// Look for paths based on other split indices if path as per split index
|
||||||
// does not exist.
|
// does not exist.
|
||||||
if (!exists(nodeUpdatePath)) {
|
if (!exists(nodeUpdatePath)) {
|
||||||
AppNodeSplitInfo alternatePathInfo = getAlternatePath(appId.toString());
|
ZnodeSplitInfo alternatePathInfo = getAlternateAppPath(appId.toString());
|
||||||
if (alternatePathInfo != null) {
|
if (alternatePathInfo != null) {
|
||||||
nodeUpdatePath = alternatePathInfo.path;
|
nodeUpdatePath = alternatePathInfo.path;
|
||||||
} else {
|
} else {
|
||||||
|
@ -778,7 +852,7 @@ public class ZKRMStateStore extends RMStateStore {
|
||||||
pathExists = false;
|
pathExists = false;
|
||||||
if (appIdNodeSplitIndex != 0) {
|
if (appIdNodeSplitIndex != 0) {
|
||||||
String rootNode =
|
String rootNode =
|
||||||
getSplitAppNodeParent(nodeUpdatePath, appIdNodeSplitIndex);
|
getSplitZnodeParent(nodeUpdatePath, appIdNodeSplitIndex);
|
||||||
if (!exists(rootNode)) {
|
if (!exists(rootNode)) {
|
||||||
zkManager.safeCreate(rootNode, null, zkAcl, CreateMode.PERSISTENT,
|
zkManager.safeCreate(rootNode, null, zkAcl, CreateMode.PERSISTENT,
|
||||||
zkAcl, fencingNodePath);
|
zkAcl, fencingNodePath);
|
||||||
|
@ -819,7 +893,7 @@ public class ZKRMStateStore extends RMStateStore {
|
||||||
String appDirPath = getLeafAppIdNodePath(appId, false);
|
String appDirPath = getLeafAppIdNodePath(appId, false);
|
||||||
// Look for paths based on other split indices.
|
// Look for paths based on other split indices.
|
||||||
if (!exists(appDirPath)) {
|
if (!exists(appDirPath)) {
|
||||||
AppNodeSplitInfo alternatePathInfo = getAlternatePath(appId);
|
ZnodeSplitInfo alternatePathInfo = getAlternateAppPath(appId);
|
||||||
if (alternatePathInfo == null) {
|
if (alternatePathInfo == null) {
|
||||||
if (operation == AppAttemptOp.REMOVE) {
|
if (operation == AppAttemptOp.REMOVE) {
|
||||||
// Unexpected. Assume that app attempt has been deleted.
|
// Unexpected. Assume that app attempt has been deleted.
|
||||||
|
@ -918,7 +992,7 @@ public class ZKRMStateStore extends RMStateStore {
|
||||||
// Look for paths based on other split indices if path as per configured
|
// Look for paths based on other split indices if path as per configured
|
||||||
// split index does not exist.
|
// split index does not exist.
|
||||||
if (!exists(appIdRemovePath)) {
|
if (!exists(appIdRemovePath)) {
|
||||||
AppNodeSplitInfo alternatePathInfo = getAlternatePath(removeAppId);
|
ZnodeSplitInfo alternatePathInfo = getAlternateAppPath(removeAppId);
|
||||||
if (alternatePathInfo != null) {
|
if (alternatePathInfo != null) {
|
||||||
appIdRemovePath = alternatePathInfo.path;
|
appIdRemovePath = alternatePathInfo.path;
|
||||||
splitIndex = alternatePathInfo.splitIndex;
|
splitIndex = alternatePathInfo.splitIndex;
|
||||||
|
@ -946,24 +1020,60 @@ public class ZKRMStateStore extends RMStateStore {
|
||||||
forPath(appIdRemovePath);
|
forPath(appIdRemovePath);
|
||||||
}
|
}
|
||||||
// Check if we should remove the parent app node as well.
|
// Check if we should remove the parent app node as well.
|
||||||
checkRemoveParentAppNode(appIdRemovePath, splitIndex);
|
checkRemoveParentZnode(appIdRemovePath, splitIndex);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
protected synchronized void storeRMDelegationTokenState(
|
protected synchronized void storeRMDelegationTokenState(
|
||||||
RMDelegationTokenIdentifier rmDTIdentifier, Long renewDate)
|
RMDelegationTokenIdentifier rmDTIdentifier, Long renewDate)
|
||||||
throws Exception {
|
throws Exception {
|
||||||
SafeTransaction trx = zkManager.createTransaction(zkAcl, fencingNodePath);
|
String nodeCreatePath = getLeafDelegationTokenNodePath(
|
||||||
addStoreOrUpdateOps(trx, rmDTIdentifier, renewDate, false);
|
rmDTIdentifier.getSequenceNumber(), true);
|
||||||
trx.commit();
|
if (LOG.isDebugEnabled()) {
|
||||||
|
LOG.debug("Storing " + DELEGATION_TOKEN_PREFIX
|
||||||
|
+ rmDTIdentifier.getSequenceNumber());
|
||||||
|
}
|
||||||
|
|
||||||
|
RMDelegationTokenIdentifierData identifierData =
|
||||||
|
new RMDelegationTokenIdentifierData(rmDTIdentifier, renewDate);
|
||||||
|
ByteArrayOutputStream seqOs = new ByteArrayOutputStream();
|
||||||
|
try (DataOutputStream seqOut = new DataOutputStream(seqOs)) {
|
||||||
|
SafeTransaction trx = zkManager.createTransaction(zkAcl,
|
||||||
|
fencingNodePath);
|
||||||
|
trx.create(nodeCreatePath, identifierData.toByteArray(), zkAcl,
|
||||||
|
CreateMode.PERSISTENT);
|
||||||
|
// Update Sequence number only while storing DT
|
||||||
|
seqOut.writeInt(rmDTIdentifier.getSequenceNumber());
|
||||||
|
|
||||||
|
if (LOG.isDebugEnabled()) {
|
||||||
|
LOG.debug("Storing " + dtSequenceNumberPath + ". SequenceNumber: "
|
||||||
|
+ rmDTIdentifier.getSequenceNumber());
|
||||||
|
}
|
||||||
|
|
||||||
|
trx.setData(dtSequenceNumberPath, seqOs.toByteArray(), -1);
|
||||||
|
trx.commit();
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
protected synchronized void removeRMDelegationTokenState(
|
protected synchronized void removeRMDelegationTokenState(
|
||||||
RMDelegationTokenIdentifier rmDTIdentifier) throws Exception {
|
RMDelegationTokenIdentifier rmDTIdentifier) throws Exception {
|
||||||
String nodeRemovePath =
|
String nodeRemovePath = getLeafDelegationTokenNodePath(
|
||||||
getNodePath(delegationTokensRootPath, DELEGATION_TOKEN_PREFIX
|
rmDTIdentifier.getSequenceNumber(), false);
|
||||||
+ rmDTIdentifier.getSequenceNumber());
|
int splitIndex = delegationTokenNodeSplitIndex;
|
||||||
|
// Look for paths based on other split indices if path as per configured
|
||||||
|
// split index does not exist.
|
||||||
|
if (!exists(nodeRemovePath)) {
|
||||||
|
ZnodeSplitInfo alternatePathInfo =
|
||||||
|
getAlternateDTPath(rmDTIdentifier.getSequenceNumber());
|
||||||
|
if (alternatePathInfo != null) {
|
||||||
|
nodeRemovePath = alternatePathInfo.path;
|
||||||
|
splitIndex = alternatePathInfo.splitIndex;
|
||||||
|
} else {
|
||||||
|
// Alternate path not found so return.
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
if (LOG.isDebugEnabled()) {
|
if (LOG.isDebugEnabled()) {
|
||||||
LOG.debug("Removing RMDelegationToken_"
|
LOG.debug("Removing RMDelegationToken_"
|
||||||
|
@ -971,62 +1081,41 @@ public class ZKRMStateStore extends RMStateStore {
|
||||||
}
|
}
|
||||||
|
|
||||||
zkManager.safeDelete(nodeRemovePath, zkAcl, fencingNodePath);
|
zkManager.safeDelete(nodeRemovePath, zkAcl, fencingNodePath);
|
||||||
|
|
||||||
|
// Check if we should remove the parent app node as well.
|
||||||
|
checkRemoveParentZnode(nodeRemovePath, splitIndex);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
protected synchronized void updateRMDelegationTokenState(
|
protected synchronized void updateRMDelegationTokenState(
|
||||||
RMDelegationTokenIdentifier rmDTIdentifier, Long renewDate)
|
RMDelegationTokenIdentifier rmDTIdentifier, Long renewDate)
|
||||||
throws Exception {
|
throws Exception {
|
||||||
SafeTransaction trx = zkManager.createTransaction(zkAcl, fencingNodePath);
|
String nodeUpdatePath = getLeafDelegationTokenNodePath(
|
||||||
String nodeRemovePath =
|
rmDTIdentifier.getSequenceNumber(), false);
|
||||||
getNodePath(delegationTokensRootPath, DELEGATION_TOKEN_PREFIX
|
boolean pathExists = true;
|
||||||
+ rmDTIdentifier.getSequenceNumber());
|
// Look for paths based on other split indices if path as per split index
|
||||||
|
// does not exist.
|
||||||
if (exists(nodeRemovePath)) {
|
if (!exists(nodeUpdatePath)) {
|
||||||
// in case znode exists
|
ZnodeSplitInfo alternatePathInfo =
|
||||||
addStoreOrUpdateOps(trx, rmDTIdentifier, renewDate, true);
|
getAlternateDTPath(rmDTIdentifier.getSequenceNumber());
|
||||||
} else {
|
if (alternatePathInfo != null) {
|
||||||
// in case znode doesn't exist
|
nodeUpdatePath = alternatePathInfo.path;
|
||||||
addStoreOrUpdateOps(trx, rmDTIdentifier, renewDate, false);
|
} else {
|
||||||
if (LOG.isDebugEnabled()) {
|
pathExists = false;
|
||||||
LOG.debug("Attempted to update a non-existing znode " + nodeRemovePath);
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
trx.commit();
|
if (pathExists) {
|
||||||
}
|
if (LOG.isDebugEnabled()) {
|
||||||
|
LOG.debug("Updating " + DELEGATION_TOKEN_PREFIX
|
||||||
private void addStoreOrUpdateOps(SafeTransaction trx,
|
+ rmDTIdentifier.getSequenceNumber());
|
||||||
RMDelegationTokenIdentifier rmDTIdentifier, Long renewDate,
|
|
||||||
boolean isUpdate) throws Exception {
|
|
||||||
// store RM delegation token
|
|
||||||
String nodeCreatePath = getNodePath(delegationTokensRootPath,
|
|
||||||
DELEGATION_TOKEN_PREFIX + rmDTIdentifier.getSequenceNumber());
|
|
||||||
RMDelegationTokenIdentifierData identifierData =
|
|
||||||
new RMDelegationTokenIdentifierData(rmDTIdentifier, renewDate);
|
|
||||||
ByteArrayOutputStream seqOs = new ByteArrayOutputStream();
|
|
||||||
|
|
||||||
try (DataOutputStream seqOut = new DataOutputStream(seqOs)) {
|
|
||||||
|
|
||||||
if (isUpdate) {
|
|
||||||
if (LOG.isDebugEnabled()) {
|
|
||||||
LOG.debug("Updating RMDelegationToken_"
|
|
||||||
+ rmDTIdentifier.getSequenceNumber());
|
|
||||||
}
|
|
||||||
trx.setData(nodeCreatePath, identifierData.toByteArray(), -1);
|
|
||||||
} else {
|
|
||||||
trx.create(nodeCreatePath, identifierData.toByteArray(), zkAcl,
|
|
||||||
CreateMode.PERSISTENT);
|
|
||||||
// Update Sequence number only while storing DT
|
|
||||||
seqOut.writeInt(rmDTIdentifier.getSequenceNumber());
|
|
||||||
|
|
||||||
if (LOG.isDebugEnabled()) {
|
|
||||||
LOG.debug("Storing " + dtSequenceNumberPath + ". SequenceNumber: "
|
|
||||||
+ rmDTIdentifier.getSequenceNumber());
|
|
||||||
}
|
|
||||||
|
|
||||||
trx.setData(dtSequenceNumberPath, seqOs.toByteArray(), -1);
|
|
||||||
}
|
}
|
||||||
|
RMDelegationTokenIdentifierData identifierData =
|
||||||
|
new RMDelegationTokenIdentifierData(rmDTIdentifier, renewDate);
|
||||||
|
zkManager.safeSetData(nodeUpdatePath, identifierData.toByteArray(), -1,
|
||||||
|
zkAcl, fencingNodePath);
|
||||||
|
} else {
|
||||||
|
storeRMDelegationTokenState(rmDTIdentifier, renewDate);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -1156,19 +1245,19 @@ public class ZKRMStateStore extends RMStateStore {
|
||||||
* Get alternate path for app id if path according to configured split index
|
* Get alternate path for app id if path according to configured split index
|
||||||
* does not exist. We look for path based on all possible split indices.
|
* does not exist. We look for path based on all possible split indices.
|
||||||
* @param appId
|
* @param appId
|
||||||
* @return a {@link AppNodeSplitInfo} object containing the path and split
|
* @return a {@link ZnodeSplitInfo} object containing the path and split
|
||||||
* index if it exists, null otherwise.
|
* index if it exists, null otherwise.
|
||||||
* @throws Exception if any problem occurs while performing ZK operation.
|
* @throws Exception if any problem occurs while performing ZK operation.
|
||||||
*/
|
*/
|
||||||
private AppNodeSplitInfo getAlternatePath(String appId) throws Exception {
|
private ZnodeSplitInfo getAlternateAppPath(String appId) throws Exception {
|
||||||
for (Map.Entry<Integer, String> entry : rmAppRootHierarchies.entrySet()) {
|
for (Map.Entry<Integer, String> entry : rmAppRootHierarchies.entrySet()) {
|
||||||
// Look for other paths
|
// Look for other paths
|
||||||
int splitIndex = entry.getKey();
|
int splitIndex = entry.getKey();
|
||||||
if (splitIndex != appIdNodeSplitIndex) {
|
if (splitIndex != appIdNodeSplitIndex) {
|
||||||
String alternatePath =
|
String alternatePath =
|
||||||
getLeafAppIdNodePath(appId, entry.getValue(), splitIndex, false);
|
getLeafZnodePath(appId, entry.getValue(), splitIndex, false);
|
||||||
if (exists(alternatePath)) {
|
if (exists(alternatePath)) {
|
||||||
return new AppNodeSplitInfo(alternatePath, splitIndex);
|
return new ZnodeSplitInfo(alternatePath, splitIndex);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -1176,26 +1265,25 @@ public class ZKRMStateStore extends RMStateStore {
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Returns leaf app node path based on app id and passed split index. If the
|
* Returns leaf znode path based on node name and passed split index. If the
|
||||||
* passed flag createParentIfNotExists is true, also creates the parent app
|
* passed flag createParentIfNotExists is true, also creates the parent znode
|
||||||
* node if it does not exist.
|
* if it does not exist.
|
||||||
* @param appId application id.
|
* @param nodeName the node name.
|
||||||
* @param rootNode app root node based on split index.
|
* @param rootNode app root node based on split index.
|
||||||
* @param appIdNodeSplitIdx split index.
|
* @param splitIdx split index.
|
||||||
* @param createParentIfNotExists flag which determines if parent app node
|
* @param createParentIfNotExists flag which determines if parent znode
|
||||||
* needs to be created(as per split) if it does not exist.
|
* needs to be created(as per split) if it does not exist.
|
||||||
* @return leaf app node path.
|
* @return leaf znode path.
|
||||||
* @throws Exception if any problem occurs while performing ZK operation.
|
* @throws Exception if any problem occurs while performing ZK operation.
|
||||||
*/
|
*/
|
||||||
private String getLeafAppIdNodePath(String appId, String rootNode,
|
private String getLeafZnodePath(String nodeName, String rootNode,
|
||||||
int appIdNodeSplitIdx, boolean createParentIfNotExists) throws Exception {
|
int splitIdx, boolean createParentIfNotExists) throws Exception {
|
||||||
if (appIdNodeSplitIdx == 0) {
|
if (splitIdx == 0) {
|
||||||
return getNodePath(rootNode, appId);
|
return getNodePath(rootNode, nodeName);
|
||||||
}
|
}
|
||||||
String nodeName = appId;
|
int split = nodeName.length() - splitIdx;
|
||||||
int splitIdx = nodeName.length() - appIdNodeSplitIdx;
|
|
||||||
String rootNodePath =
|
String rootNodePath =
|
||||||
getNodePath(rootNode, nodeName.substring(0, splitIdx));
|
getNodePath(rootNode, nodeName.substring(0, split));
|
||||||
if (createParentIfNotExists && !exists(rootNodePath)) {
|
if (createParentIfNotExists && !exists(rootNodePath)) {
|
||||||
try {
|
try {
|
||||||
zkManager.safeCreate(rootNodePath, null, zkAcl, CreateMode.PERSISTENT,
|
zkManager.safeCreate(rootNodePath, null, zkAcl, CreateMode.PERSISTENT,
|
||||||
|
@ -1207,7 +1295,7 @@ public class ZKRMStateStore extends RMStateStore {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
return getNodePath(rootNodePath, nodeName.substring(splitIdx));
|
return getNodePath(rootNodePath, nodeName.substring(split));
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -1222,10 +1310,77 @@ public class ZKRMStateStore extends RMStateStore {
|
||||||
*/
|
*/
|
||||||
private String getLeafAppIdNodePath(String appId,
|
private String getLeafAppIdNodePath(String appId,
|
||||||
boolean createParentIfNotExists) throws Exception {
|
boolean createParentIfNotExists) throws Exception {
|
||||||
return getLeafAppIdNodePath(appId, rmAppRootHierarchies.get(
|
return getLeafZnodePath(appId, rmAppRootHierarchies.get(
|
||||||
appIdNodeSplitIndex), appIdNodeSplitIndex, createParentIfNotExists);
|
appIdNodeSplitIndex), appIdNodeSplitIndex, createParentIfNotExists);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Returns leaf delegation token node path based on sequence number and
|
||||||
|
* configured split index. If the passed flag createParentIfNotExists is true,
|
||||||
|
* also creates the parent znode if it does not exist. The sequence number
|
||||||
|
* is padded to be at least 4 digits wide to ensure consistency with the split
|
||||||
|
* indexing.
|
||||||
|
* @param rmDTSequenceNumber delegation token sequence number.
|
||||||
|
* @param createParentIfNotExists flag which determines if parent znode
|
||||||
|
* needs to be created(as per split) if it does not exist.
|
||||||
|
* @return leaf delegation token node path.
|
||||||
|
* @throws Exception if any problem occurs while performing ZK operation.
|
||||||
|
*/
|
||||||
|
private String getLeafDelegationTokenNodePath(int rmDTSequenceNumber,
|
||||||
|
boolean createParentIfNotExists) throws Exception {
|
||||||
|
return getLeafDelegationTokenNodePath(rmDTSequenceNumber,
|
||||||
|
createParentIfNotExists, delegationTokenNodeSplitIndex);
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Returns leaf delegation token node path based on sequence number and
|
||||||
|
* passed split index. If the passed flag createParentIfNotExists is true,
|
||||||
|
* also creates the parent znode if it does not exist. The sequence number
|
||||||
|
* is padded to be at least 4 digits wide to ensure consistency with the split
|
||||||
|
* indexing.
|
||||||
|
* @param rmDTSequenceNumber delegation token sequence number.
|
||||||
|
* @param createParentIfNotExists flag which determines if parent znode
|
||||||
|
* needs to be created(as per split) if it does not exist.
|
||||||
|
* @param split the split index to use
|
||||||
|
* @return leaf delegation token node path.
|
||||||
|
* @throws Exception if any problem occurs while performing ZK operation.
|
||||||
|
*/
|
||||||
|
private String getLeafDelegationTokenNodePath(int rmDTSequenceNumber,
|
||||||
|
boolean createParentIfNotExists, int split) throws Exception {
|
||||||
|
String nodeName = DELEGATION_TOKEN_PREFIX;
|
||||||
|
if (split == 0) {
|
||||||
|
nodeName += rmDTSequenceNumber;
|
||||||
|
} else {
|
||||||
|
nodeName += String.format("%04d", rmDTSequenceNumber);
|
||||||
|
}
|
||||||
|
return getLeafZnodePath(nodeName, rmDelegationTokenHierarchies.get(split),
|
||||||
|
split, createParentIfNotExists);
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Get alternate path for delegation token if path according to configured
|
||||||
|
* split index does not exist. We look for path based on all possible split
|
||||||
|
* indices.
|
||||||
|
* @param rmDTSequenceNumber delegation token sequence number.
|
||||||
|
* @return a {@link ZnodeSplitInfo} object containing the path and split
|
||||||
|
* index if it exists, null otherwise.
|
||||||
|
* @throws Exception if any problem occurs while performing ZK operation.
|
||||||
|
*/
|
||||||
|
private ZnodeSplitInfo getAlternateDTPath(int rmDTSequenceNumber)
|
||||||
|
throws Exception {
|
||||||
|
// Check all possible paths until we find it
|
||||||
|
for (int splitIndex : rmDelegationTokenHierarchies.keySet()) {
|
||||||
|
if (splitIndex != delegationTokenNodeSplitIndex) {
|
||||||
|
String alternatePath = getLeafDelegationTokenNodePath(
|
||||||
|
rmDTSequenceNumber, false, splitIndex);
|
||||||
|
if (exists(alternatePath)) {
|
||||||
|
return new ZnodeSplitInfo(alternatePath, splitIndex);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return null;
|
||||||
|
}
|
||||||
|
|
||||||
@VisibleForTesting
|
@VisibleForTesting
|
||||||
byte[] getData(final String path) throws Exception {
|
byte[] getData(final String path) throws Exception {
|
||||||
return zkManager.getData(path);
|
return zkManager.getData(path);
|
||||||
|
|
|
@ -69,7 +69,6 @@ import org.apache.zookeeper.KeeperException;
|
||||||
import org.apache.zookeeper.ZooDefs.Perms;
|
import org.apache.zookeeper.ZooDefs.Perms;
|
||||||
import org.apache.zookeeper.data.ACL;
|
import org.apache.zookeeper.data.ACL;
|
||||||
import org.junit.After;
|
import org.junit.After;
|
||||||
import org.junit.Assert;
|
|
||||||
import org.junit.Before;
|
import org.junit.Before;
|
||||||
import org.junit.Test;
|
import org.junit.Test;
|
||||||
|
|
||||||
|
@ -80,16 +79,20 @@ import com.google.common.collect.Lists;
|
||||||
|
|
||||||
import static org.junit.Assert.assertArrayEquals;
|
import static org.junit.Assert.assertArrayEquals;
|
||||||
import static org.junit.Assert.assertEquals;
|
import static org.junit.Assert.assertEquals;
|
||||||
|
import static org.junit.Assert.assertFalse;
|
||||||
import static org.junit.Assert.assertNotNull;
|
import static org.junit.Assert.assertNotNull;
|
||||||
import static org.junit.Assert.assertTrue;
|
import static org.junit.Assert.assertTrue;
|
||||||
|
import static org.junit.Assert.fail;
|
||||||
import static org.mockito.Mockito.mock;
|
import static org.mockito.Mockito.mock;
|
||||||
import static org.mockito.Mockito.spy;
|
import static org.mockito.Mockito.spy;
|
||||||
import static org.mockito.Mockito.when;
|
import static org.mockito.Mockito.when;
|
||||||
|
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
import java.util.HashMap;
|
import java.util.HashMap;
|
||||||
|
import java.util.HashSet;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
import java.util.Map;
|
import java.util.Map;
|
||||||
|
import java.util.Set;
|
||||||
|
|
||||||
import javax.crypto.SecretKey;
|
import javax.crypto.SecretKey;
|
||||||
|
|
||||||
|
@ -133,10 +136,9 @@ public class TestZKRMStateStore extends RMStateStoreTestBase {
|
||||||
TestZKRMStateStoreInternal store;
|
TestZKRMStateStoreInternal store;
|
||||||
String workingZnode;
|
String workingZnode;
|
||||||
|
|
||||||
|
|
||||||
class TestZKRMStateStoreInternal extends ZKRMStateStore {
|
class TestZKRMStateStoreInternal extends ZKRMStateStore {
|
||||||
|
|
||||||
public TestZKRMStateStoreInternal(Configuration conf, String workingZnode)
|
TestZKRMStateStoreInternal(Configuration conf, String workingZnode)
|
||||||
throws Exception {
|
throws Exception {
|
||||||
setResourceManager(new ResourceManager());
|
setResourceManager(new ResourceManager());
|
||||||
init(conf);
|
init(conf);
|
||||||
|
@ -145,7 +147,7 @@ public class TestZKRMStateStore extends RMStateStoreTestBase {
|
||||||
assertTrue(znodeWorkingPath.equals(workingZnode));
|
assertTrue(znodeWorkingPath.equals(workingZnode));
|
||||||
}
|
}
|
||||||
|
|
||||||
public String getVersionNode() {
|
private String getVersionNode() {
|
||||||
return znodeWorkingPath + "/" + ROOT_ZNODE_NAME + "/" + VERSION_NODE;
|
return znodeWorkingPath + "/" + ROOT_ZNODE_NAME + "/" + VERSION_NODE;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -167,11 +169,11 @@ public class TestZKRMStateStore extends RMStateStoreTestBase {
|
||||||
return rootPath + "/" + appPath;
|
return rootPath + "/" + appPath;
|
||||||
}
|
}
|
||||||
|
|
||||||
public String getAppNode(String appId) {
|
private String getAppNode(String appId) {
|
||||||
return getAppNode(appId, 0);
|
return getAppNode(appId, 0);
|
||||||
}
|
}
|
||||||
|
|
||||||
public String getAttemptNode(String appId, String attemptId) {
|
private String getAttemptNode(String appId, String attemptId) {
|
||||||
return getAppNode(appId) + "/" + attemptId;
|
return getAppNode(appId) + "/" + attemptId;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -179,10 +181,28 @@ public class TestZKRMStateStore extends RMStateStoreTestBase {
|
||||||
* Emulating retrying createRootDir not to raise NodeExist exception
|
* Emulating retrying createRootDir not to raise NodeExist exception
|
||||||
* @throws Exception
|
* @throws Exception
|
||||||
*/
|
*/
|
||||||
public void testRetryingCreateRootDir() throws Exception {
|
private void testRetryingCreateRootDir() throws Exception {
|
||||||
create(znodeWorkingPath);
|
create(znodeWorkingPath);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private String getDelegationTokenNode(int rmDTSequenceNumber, int splitIdx) {
|
||||||
|
String rootPath = workingZnode + "/" + ROOT_ZNODE_NAME + "/" +
|
||||||
|
RM_DT_SECRET_MANAGER_ROOT + "/" +
|
||||||
|
RMStateStore.RM_DELEGATION_TOKENS_ROOT_ZNODE_NAME;
|
||||||
|
String nodeName = DELEGATION_TOKEN_PREFIX;
|
||||||
|
if (splitIdx == 0) {
|
||||||
|
nodeName += rmDTSequenceNumber;
|
||||||
|
} else {
|
||||||
|
nodeName += String.format("%04d", rmDTSequenceNumber);
|
||||||
|
}
|
||||||
|
String path = nodeName;
|
||||||
|
if (splitIdx != 0) {
|
||||||
|
int idx = nodeName.length() - splitIdx;
|
||||||
|
path = splitIdx + "/" + nodeName.substring(0, idx) + "/"
|
||||||
|
+ nodeName.substring(idx);
|
||||||
|
}
|
||||||
|
return rootPath + "/" + path;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
private RMStateStore createStore(Configuration conf) throws Exception {
|
private RMStateStore createStore(Configuration conf) throws Exception {
|
||||||
|
@ -240,6 +260,17 @@ public class TestZKRMStateStore extends RMStateStoreTestBase {
|
||||||
.forPath(store.getAttemptNode(
|
.forPath(store.getAttemptNode(
|
||||||
attemptId.getApplicationId().toString(), attemptId.toString()));
|
attemptId.getApplicationId().toString(), attemptId.toString()));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public boolean delegationTokenExists(RMDelegationTokenIdentifier token,
|
||||||
|
int index) throws Exception {
|
||||||
|
int rmDTSequenceNumber = token.getSequenceNumber();
|
||||||
|
return curatorFramework.checkExists().forPath(
|
||||||
|
store.getDelegationTokenNode(rmDTSequenceNumber, index)) != null;
|
||||||
|
}
|
||||||
|
|
||||||
|
public int getDelegationTokenNodeSplitIndex() {
|
||||||
|
return store.delegationTokenNodeSplitIndex;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test (timeout = 60000)
|
@Test (timeout = 60000)
|
||||||
|
@ -337,7 +368,8 @@ public class TestZKRMStateStore extends RMStateStoreTestBase {
|
||||||
RMStateStore store = zkTester.getRMStateStore();
|
RMStateStore store = zkTester.getRMStateStore();
|
||||||
Version defaultVersion = zkTester.getCurrentVersion();
|
Version defaultVersion = zkTester.getCurrentVersion();
|
||||||
store.checkVersion();
|
store.checkVersion();
|
||||||
Assert.assertEquals(defaultVersion, store.loadVersion());
|
assertEquals("Store had wrong version",
|
||||||
|
defaultVersion, store.loadVersion());
|
||||||
}
|
}
|
||||||
|
|
||||||
public static Configuration createHARMConf(String rmIds, String rmId,
|
public static Configuration createHARMConf(String rmIds, String rmId,
|
||||||
|
@ -551,11 +583,20 @@ public class TestZKRMStateStore extends RMStateStoreTestBase {
|
||||||
new Text("renewer1"), new Text("realuser1"));
|
new Text("renewer1"), new Text("realuser1"));
|
||||||
Long renewDate1 = new Long(System.currentTimeMillis());
|
Long renewDate1 = new Long(System.currentTimeMillis());
|
||||||
dtId1.setSequenceNumber(1111);
|
dtId1.setSequenceNumber(1111);
|
||||||
|
assertFalse("Token " + dtId1
|
||||||
|
+ " should not exist but was found in ZooKeeper",
|
||||||
|
zkTester.delegationTokenExists(dtId1, 0));
|
||||||
store.storeRMDelegationToken(dtId1, renewDate1);
|
store.storeRMDelegationToken(dtId1, renewDate1);
|
||||||
|
assertFalse("Token " + dtId1
|
||||||
|
+ " should not exist but was found in ZooKeeper",
|
||||||
|
zkTester.delegationTokenExists(dtId1, 0));
|
||||||
assertEquals("RMStateStore should have been in fenced state", true,
|
assertEquals("RMStateStore should have been in fenced state", true,
|
||||||
store.isFencedState());
|
store.isFencedState());
|
||||||
|
|
||||||
store.updateRMDelegationToken(dtId1, renewDate1);
|
store.updateRMDelegationToken(dtId1, renewDate1);
|
||||||
|
assertFalse("Token " + dtId1
|
||||||
|
+ " should not exist but was found in ZooKeeper",
|
||||||
|
zkTester.delegationTokenExists(dtId1, 0));
|
||||||
assertEquals("RMStateStore should have been in fenced state", true,
|
assertEquals("RMStateStore should have been in fenced state", true,
|
||||||
store.isFencedState());
|
store.isFencedState());
|
||||||
|
|
||||||
|
@ -611,7 +652,7 @@ public class TestZKRMStateStore extends RMStateStoreTestBase {
|
||||||
try {
|
try {
|
||||||
store.removeApplicationStateInternal(appStateRemoved);
|
store.removeApplicationStateInternal(appStateRemoved);
|
||||||
} catch (KeeperException.NoNodeException nne) {
|
} catch (KeeperException.NoNodeException nne) {
|
||||||
Assert.fail("NoNodeException should not happen.");
|
fail("NoNodeException should not happen.");
|
||||||
}
|
}
|
||||||
store.close();
|
store.close();
|
||||||
}
|
}
|
||||||
|
@ -1129,4 +1170,317 @@ public class TestZKRMStateStore extends RMStateStoreTestBase {
|
||||||
// Close the state store.
|
// Close the state store.
|
||||||
store.close();
|
store.close();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private static Configuration createConfForDelegationTokenNodeSplit(
|
||||||
|
int splitIndex) {
|
||||||
|
YarnConfiguration conf = new YarnConfiguration();
|
||||||
|
conf.setInt(YarnConfiguration.ZK_DELEGATION_TOKEN_NODE_SPLIT_INDEX,
|
||||||
|
splitIndex);
|
||||||
|
return conf;
|
||||||
|
}
|
||||||
|
|
||||||
|
private void verifyDelegationTokensStateStore(
|
||||||
|
TestZKRMStateStoreTester zkTester,
|
||||||
|
Map<RMDelegationTokenIdentifier, Long> tokensWithRenewal,
|
||||||
|
Map<RMDelegationTokenIdentifier, Integer> tokensWithIndex,
|
||||||
|
int sequenceNumber) throws Exception {
|
||||||
|
RMStateStore.RMDTSecretManagerState secretManagerState =
|
||||||
|
zkTester.store.loadState().getRMDTSecretManagerState();
|
||||||
|
assertEquals("Unexpected token state",
|
||||||
|
tokensWithRenewal, secretManagerState.getTokenState());
|
||||||
|
assertEquals("Unexpected sequence number", sequenceNumber,
|
||||||
|
secretManagerState.getDTSequenceNumber());
|
||||||
|
for (Map.Entry<RMDelegationTokenIdentifier, Integer> tokenEntry
|
||||||
|
: tokensWithIndex.entrySet()) {
|
||||||
|
assertTrue("Expected to find token " + tokenEntry.getKey()
|
||||||
|
+ " in zookeeper but did not",
|
||||||
|
zkTester.delegationTokenExists(tokenEntry.getKey(),
|
||||||
|
tokenEntry.getValue()));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private void verifyDelegationTokenInStateStore(
|
||||||
|
TestZKRMStateStoreTester zkTester, RMDelegationTokenIdentifier token,
|
||||||
|
long renewDate, int index) throws Exception {
|
||||||
|
RMStateStore.RMDTSecretManagerState secretManagerState =
|
||||||
|
zkTester.store.loadState().getRMDTSecretManagerState();
|
||||||
|
Map<RMDelegationTokenIdentifier, Long> tokenState =
|
||||||
|
secretManagerState.getTokenState();
|
||||||
|
assertTrue("token state does not contain " + token,
|
||||||
|
tokenState.containsKey(token));
|
||||||
|
assertTrue("token state does not contain a token with renewal " + renewDate,
|
||||||
|
tokenState.containsValue(renewDate));
|
||||||
|
assertTrue("Token " + token + "should exist but was not found in ZooKeeper",
|
||||||
|
zkTester.delegationTokenExists(token, index));
|
||||||
|
}
|
||||||
|
|
||||||
|
private RMDelegationTokenIdentifier storeUpdateAndVerifyDelegationToken(
|
||||||
|
TestZKRMStateStoreTester zkTester,
|
||||||
|
Map<RMDelegationTokenIdentifier, Long> tokensWithRenewal,
|
||||||
|
Map<RMDelegationTokenIdentifier, Integer> tokensWithIndex,
|
||||||
|
int sequenceNumber, int split) throws Exception {
|
||||||
|
// Store token
|
||||||
|
RMDelegationTokenIdentifier token =
|
||||||
|
new RMDelegationTokenIdentifier(new Text("owner"),
|
||||||
|
new Text("renewer"), new Text("realuser"));
|
||||||
|
assertFalse("Token should not exist but was found in ZooKeeper",
|
||||||
|
zkTester.delegationTokenExists(token, split));
|
||||||
|
token.setSequenceNumber(sequenceNumber);
|
||||||
|
Long renewDate = System.currentTimeMillis();
|
||||||
|
zkTester.store.storeRMDelegationToken(token, renewDate);
|
||||||
|
modifyRMDelegationTokenState();
|
||||||
|
tokensWithRenewal.put(token, renewDate);
|
||||||
|
tokensWithIndex.put(token, split);
|
||||||
|
|
||||||
|
// Verify the token
|
||||||
|
verifyDelegationTokensStateStore(zkTester, tokensWithRenewal,
|
||||||
|
tokensWithIndex, sequenceNumber);
|
||||||
|
|
||||||
|
// Update the token
|
||||||
|
renewDate = System.currentTimeMillis();
|
||||||
|
zkTester.store.updateRMDelegationToken(token, renewDate);
|
||||||
|
tokensWithRenewal.put(token, renewDate);
|
||||||
|
tokensWithIndex.put(token, split);
|
||||||
|
|
||||||
|
// Verify updates
|
||||||
|
verifyDelegationTokensStateStore(zkTester, tokensWithRenewal,
|
||||||
|
tokensWithIndex, sequenceNumber);
|
||||||
|
|
||||||
|
return token;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testDelegationTokenSplitIndexConfig() throws Exception {
|
||||||
|
// Valid values
|
||||||
|
TestZKRMStateStoreTester zkTester = new TestZKRMStateStoreTester();
|
||||||
|
zkTester.getRMStateStore(createConfForDelegationTokenNodeSplit(0)).close();
|
||||||
|
assertEquals("Incorrect split index",
|
||||||
|
0, zkTester.getDelegationTokenNodeSplitIndex());
|
||||||
|
zkTester.getRMStateStore(createConfForDelegationTokenNodeSplit(1)).close();
|
||||||
|
assertEquals("Incorrect split index",
|
||||||
|
1, zkTester.getDelegationTokenNodeSplitIndex());
|
||||||
|
zkTester.getRMStateStore(createConfForDelegationTokenNodeSplit(2)).close();
|
||||||
|
assertEquals("Incorrect split index",
|
||||||
|
2, zkTester.getDelegationTokenNodeSplitIndex());
|
||||||
|
zkTester.getRMStateStore(createConfForDelegationTokenNodeSplit(3)).close();
|
||||||
|
assertEquals("Incorrect split index",
|
||||||
|
3, zkTester.getDelegationTokenNodeSplitIndex());
|
||||||
|
zkTester.getRMStateStore(createConfForDelegationTokenNodeSplit(4)).close();
|
||||||
|
assertEquals("Incorrect split index",
|
||||||
|
4, zkTester.getDelegationTokenNodeSplitIndex());
|
||||||
|
|
||||||
|
// Invalid values --> override to 0
|
||||||
|
zkTester.getRMStateStore(createConfForDelegationTokenNodeSplit(-1)).close();
|
||||||
|
assertEquals("Incorrect split index",
|
||||||
|
0, zkTester.getDelegationTokenNodeSplitIndex());
|
||||||
|
zkTester.getRMStateStore(createConfForDelegationTokenNodeSplit(5)).close();
|
||||||
|
assertEquals("Incorrect split index",
|
||||||
|
0, zkTester.getDelegationTokenNodeSplitIndex());
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testDelegationTokenNodeNoSplit() throws Exception {
|
||||||
|
testDelegationTokenNode(0);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testDelegationTokenNodeWithSplitOne() throws Exception {
|
||||||
|
testDelegationTokenNode(1);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testDelegationTokenNodeWithSplitTwo() throws Exception {
|
||||||
|
testDelegationTokenNode(2);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testDelegationTokenNodeWithSplitThree() throws Exception {
|
||||||
|
testDelegationTokenNode(3);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testDelegationTokenNodeWithSplitFour() throws Exception {
|
||||||
|
testDelegationTokenNode(4);
|
||||||
|
}
|
||||||
|
|
||||||
|
public void testDelegationTokenNode(int split) throws Exception {
|
||||||
|
TestZKRMStateStoreTester zkTester = new TestZKRMStateStoreTester();
|
||||||
|
Configuration conf = createConfForDelegationTokenNodeSplit(split);
|
||||||
|
RMStateStore store = zkTester.getRMStateStore(conf);
|
||||||
|
|
||||||
|
// Store the token and verify
|
||||||
|
Map<RMDelegationTokenIdentifier, Long> tokensWithRenewal = new HashMap<>();
|
||||||
|
Map<RMDelegationTokenIdentifier, Integer> tokensWithIndex = new HashMap<>();
|
||||||
|
int sequenceNumber = 0;
|
||||||
|
RMDelegationTokenIdentifier token = storeUpdateAndVerifyDelegationToken(
|
||||||
|
zkTester, tokensWithRenewal, tokensWithIndex, sequenceNumber, split);
|
||||||
|
|
||||||
|
// Delete the token and verify
|
||||||
|
store.removeRMDelegationToken(token);
|
||||||
|
RMStateStore.RMDTSecretManagerState state =
|
||||||
|
store.loadState().getRMDTSecretManagerState();
|
||||||
|
tokensWithRenewal.clear();
|
||||||
|
tokensWithIndex.clear();
|
||||||
|
assertEquals("Unexpected token state",
|
||||||
|
tokensWithRenewal, state.getTokenState());
|
||||||
|
assertEquals("Unexpected sequence number",
|
||||||
|
sequenceNumber, state.getDTSequenceNumber());
|
||||||
|
assertFalse("Token should not exist but was found in ZooKeeper",
|
||||||
|
zkTester.delegationTokenExists(token, split));
|
||||||
|
store.close();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testDelegationTokenNodeWithSplitMultiple() throws Exception {
|
||||||
|
TestZKRMStateStoreTester zkTester = new TestZKRMStateStoreTester();
|
||||||
|
Configuration conf = createConfForDelegationTokenNodeSplit(1);
|
||||||
|
RMStateStore store = zkTester.getRMStateStore(conf);
|
||||||
|
|
||||||
|
// With the split set to 1, we can store 10 tokens under a znode (i.e. 0-9)
|
||||||
|
// Try to store more than 10
|
||||||
|
Map<RMDelegationTokenIdentifier, Long> tokensWithRenewal = new HashMap<>();
|
||||||
|
Map<RMDelegationTokenIdentifier, Integer> tokensWithIndex = new HashMap<>();
|
||||||
|
Set<RMDelegationTokenIdentifier> tokensToDelete = new HashSet<>();
|
||||||
|
int sequenceNumber = 0;
|
||||||
|
for (int i = 0; i <= 12; i++) {
|
||||||
|
RMDelegationTokenIdentifier token =
|
||||||
|
new RMDelegationTokenIdentifier(new Text("owner" + i),
|
||||||
|
new Text("renewer" + i), new Text("realuser" + i));
|
||||||
|
sequenceNumber = i;
|
||||||
|
token.setSequenceNumber(sequenceNumber);
|
||||||
|
assertFalse("Token should not exist but was found in ZooKeeper",
|
||||||
|
zkTester.delegationTokenExists(token, 1));
|
||||||
|
Long renewDate = System.currentTimeMillis();
|
||||||
|
store.storeRMDelegationToken(token, renewDate);
|
||||||
|
modifyRMDelegationTokenState();
|
||||||
|
tokensWithRenewal.put(token, renewDate);
|
||||||
|
tokensWithIndex.put(token, 1);
|
||||||
|
switch (i) {
|
||||||
|
case 0:
|
||||||
|
case 3:
|
||||||
|
case 6:
|
||||||
|
case 11:
|
||||||
|
tokensToDelete.add(token);
|
||||||
|
break;
|
||||||
|
default:
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
// Verify
|
||||||
|
verifyDelegationTokensStateStore(zkTester, tokensWithRenewal,
|
||||||
|
tokensWithIndex, sequenceNumber);
|
||||||
|
|
||||||
|
// Try deleting some tokens and adding some new ones
|
||||||
|
for (RMDelegationTokenIdentifier tokenToDelete : tokensToDelete) {
|
||||||
|
store.removeRMDelegationToken(tokenToDelete);
|
||||||
|
tokensWithRenewal.remove(tokenToDelete);
|
||||||
|
tokensWithIndex.remove(tokenToDelete);
|
||||||
|
}
|
||||||
|
for (int i = 13; i <= 22; i++) {
|
||||||
|
RMDelegationTokenIdentifier token =
|
||||||
|
new RMDelegationTokenIdentifier(new Text("owner" + i),
|
||||||
|
new Text("renewer" + i), new Text("realuser" + i));
|
||||||
|
sequenceNumber = i;
|
||||||
|
token.setSequenceNumber(sequenceNumber);
|
||||||
|
Long renewDate = System.currentTimeMillis();
|
||||||
|
store.storeRMDelegationToken(token, renewDate);
|
||||||
|
modifyRMDelegationTokenState();
|
||||||
|
tokensWithRenewal.put(token, renewDate);
|
||||||
|
tokensWithIndex.put(token, 1);
|
||||||
|
}
|
||||||
|
// Verify
|
||||||
|
verifyDelegationTokensStateStore(zkTester, tokensWithRenewal,
|
||||||
|
tokensWithIndex, sequenceNumber);
|
||||||
|
for (RMDelegationTokenIdentifier token : tokensToDelete) {
|
||||||
|
assertFalse("Token " + token
|
||||||
|
+ " should not exist but was found in ZooKeeper",
|
||||||
|
zkTester.delegationTokenExists(token, 1));
|
||||||
|
}
|
||||||
|
store.close();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testDelegationTokenNodeWithSplitChangeAcrossRestarts()
|
||||||
|
throws Exception {
|
||||||
|
TestZKRMStateStoreTester zkTester = new TestZKRMStateStoreTester();
|
||||||
|
Map<RMDelegationTokenIdentifier, Long> tokensWithRenewal = new HashMap<>();
|
||||||
|
Map<RMDelegationTokenIdentifier, Integer> tokensWithIndex = new HashMap<>();
|
||||||
|
int sequenceNumber = 0;
|
||||||
|
|
||||||
|
// Start the store with index 1
|
||||||
|
Configuration conf = createConfForDelegationTokenNodeSplit(1);
|
||||||
|
RMStateStore store = zkTester.getRMStateStore(conf);
|
||||||
|
// Store a token with index 1
|
||||||
|
RMDelegationTokenIdentifier token1 = storeUpdateAndVerifyDelegationToken(
|
||||||
|
zkTester, tokensWithRenewal, tokensWithIndex, sequenceNumber, 1);
|
||||||
|
store.close();
|
||||||
|
|
||||||
|
// Start the store with index 2
|
||||||
|
conf = createConfForDelegationTokenNodeSplit(2);
|
||||||
|
store = zkTester.getRMStateStore(conf);
|
||||||
|
// Verify token1 is still there and under the /1/ znode
|
||||||
|
verifyDelegationTokenInStateStore(
|
||||||
|
zkTester, token1, tokensWithRenewal.get(token1), 1);
|
||||||
|
// Store a token with index 2
|
||||||
|
sequenceNumber++;
|
||||||
|
RMDelegationTokenIdentifier token2 = storeUpdateAndVerifyDelegationToken(
|
||||||
|
zkTester, tokensWithRenewal, tokensWithIndex, sequenceNumber, 2);
|
||||||
|
// Update and verify token1
|
||||||
|
long renewDate1 = System.currentTimeMillis();
|
||||||
|
zkTester.store.updateRMDelegationToken(token1, renewDate1);
|
||||||
|
tokensWithRenewal.put(token1, renewDate1);
|
||||||
|
verifyDelegationTokenInStateStore(
|
||||||
|
zkTester, token1, tokensWithRenewal.get(token1), 1);
|
||||||
|
store.close();
|
||||||
|
|
||||||
|
// Start the store with index 0
|
||||||
|
conf = createConfForDelegationTokenNodeSplit(0);
|
||||||
|
store = zkTester.getRMStateStore(conf);
|
||||||
|
// Verify token1 is still there and under the /1/ znode
|
||||||
|
verifyDelegationTokenInStateStore(
|
||||||
|
zkTester, token1, tokensWithRenewal.get(token1), 1);
|
||||||
|
// Verify token2 is still there and under the /2/ znode
|
||||||
|
verifyDelegationTokenInStateStore(
|
||||||
|
zkTester, token2, tokensWithRenewal.get(token2), 2);
|
||||||
|
// Store a token with no index
|
||||||
|
sequenceNumber++;
|
||||||
|
RMDelegationTokenIdentifier token0 = storeUpdateAndVerifyDelegationToken(
|
||||||
|
zkTester, tokensWithRenewal, tokensWithIndex, sequenceNumber, 0);
|
||||||
|
store.close();
|
||||||
|
|
||||||
|
// Start the store with index 3
|
||||||
|
conf = createConfForDelegationTokenNodeSplit(3);
|
||||||
|
store = zkTester.getRMStateStore(conf);
|
||||||
|
// Verify token1 is still there and under the /1/ znode
|
||||||
|
verifyDelegationTokenInStateStore(
|
||||||
|
zkTester, token1, tokensWithRenewal.get(token1), 1);
|
||||||
|
// Verify token2 is still there and under the /2/ znode
|
||||||
|
verifyDelegationTokenInStateStore(
|
||||||
|
zkTester, token2, tokensWithRenewal.get(token2), 2);
|
||||||
|
// Verify token0 is still there and under the token root node
|
||||||
|
verifyDelegationTokenInStateStore(
|
||||||
|
zkTester, token0, tokensWithRenewal.get(token0), 0);
|
||||||
|
// Delete all tokens and verify
|
||||||
|
for (RMDelegationTokenIdentifier token : tokensWithRenewal.keySet()) {
|
||||||
|
store.removeRMDelegationToken(token);
|
||||||
|
}
|
||||||
|
tokensWithRenewal.clear();
|
||||||
|
tokensWithIndex.clear();
|
||||||
|
verifyDelegationTokensStateStore(
|
||||||
|
zkTester, tokensWithRenewal, tokensWithIndex, sequenceNumber);
|
||||||
|
assertFalse("Token " + token1
|
||||||
|
+ " should not exist but was found in ZooKeeper",
|
||||||
|
zkTester.delegationTokenExists(token1, 1));
|
||||||
|
assertFalse("Token " + token1
|
||||||
|
+ " should not exist but was found in ZooKeeper",
|
||||||
|
zkTester.delegationTokenExists(token2, 2));
|
||||||
|
assertFalse("Token " + token1
|
||||||
|
+ " should not exist but was found in ZooKeeper",
|
||||||
|
zkTester.delegationTokenExists(token0, 0));
|
||||||
|
// Store a token with index 3
|
||||||
|
sequenceNumber++;
|
||||||
|
storeUpdateAndVerifyDelegationToken(zkTester, tokensWithRenewal,
|
||||||
|
tokensWithIndex, sequenceNumber, 3);
|
||||||
|
store.close();
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue