Merge r1587776 from trunk. YARN-1934. Fixed a potential NPE in ZKRMStateStore caused by handling Disconnected event from ZK. Contributed by Karthik Kambatla

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/branch-2@1587778 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Jian He 2014-04-16 02:03:54 +00:00
parent c3c195e9e8
commit 838602d23a
2 changed files with 34 additions and 11 deletions

View File

@ -111,6 +111,9 @@ Release 2.4.1 - UNRELEASED
YARN-1928. Fixed a race condition in TestAMRMRPCNodeUpdates which caused it YARN-1928. Fixed a race condition in TestAMRMRPCNodeUpdates which caused it
to fail occassionally. (Zhijie Shen via vinodkv) to fail occassionally. (Zhijie Shen via vinodkv)
YARN-1934. Fixed a potential NPE in ZKRMStateStore caused by handling
Disconnected event from ZK. (Karthik Kambatla via jianhe)
Release 2.4.0 - 2014-04-07 Release 2.4.0 - 2014-04-07
INCOMPATIBLE CHANGES INCOMPATIBLE CHANGES

View File

@ -280,10 +280,9 @@ public class ZKRMStateStore extends RMStateStore {
} }
} }
private void logRootNodeAcls(String prefix) throws KeeperException, private void logRootNodeAcls(String prefix) throws Exception {
InterruptedException {
Stat getStat = new Stat(); Stat getStat = new Stat();
List<ACL> getAcls = zkClient.getACL(zkRootNodePath, getStat); List<ACL> getAcls = getACLWithRetries(zkRootNodePath, getStat);
StringBuilder builder = new StringBuilder(); StringBuilder builder = new StringBuilder();
builder.append(prefix); builder.append(prefix);
@ -363,7 +362,7 @@ public class ZKRMStateStore extends RMStateStore {
String versionNodePath = getNodePath(zkRootNodePath, VERSION_NODE); String versionNodePath = getNodePath(zkRootNodePath, VERSION_NODE);
byte[] data = byte[] data =
((RMStateVersionPBImpl) CURRENT_VERSION_INFO).getProto().toByteArray(); ((RMStateVersionPBImpl) CURRENT_VERSION_INFO).getProto().toByteArray();
if (zkClient.exists(versionNodePath, true) != null) { if (existsWithRetries(versionNodePath, true) != null) {
setDataWithRetries(versionNodePath, data, -1); setDataWithRetries(versionNodePath, data, -1);
} else { } else {
createWithRetries(versionNodePath, data, zkAcl, CreateMode.PERSISTENT); createWithRetries(versionNodePath, data, zkAcl, CreateMode.PERSISTENT);
@ -374,7 +373,7 @@ public class ZKRMStateStore extends RMStateStore {
protected synchronized RMStateVersion loadVersion() throws Exception { protected synchronized RMStateVersion loadVersion() throws Exception {
String versionNodePath = getNodePath(zkRootNodePath, VERSION_NODE); String versionNodePath = getNodePath(zkRootNodePath, VERSION_NODE);
if (zkClient.exists(versionNodePath, true) != null) { if (existsWithRetries(versionNodePath, true) != null) {
byte[] data = getDataWithRetries(versionNodePath, true); byte[] data = getDataWithRetries(versionNodePath, true);
RMStateVersion version = RMStateVersion version =
new RMStateVersionPBImpl(RMStateVersionProto.parseFrom(data)); new RMStateVersionPBImpl(RMStateVersionProto.parseFrom(data));
@ -442,7 +441,8 @@ public class ZKRMStateStore extends RMStateStore {
} }
private void loadRMDelegationTokenState(RMState rmState) throws Exception { private void loadRMDelegationTokenState(RMState rmState) throws Exception {
List<String> childNodes = zkClient.getChildren(delegationTokensRootPath, true); List<String> childNodes =
getChildrenWithRetries(delegationTokensRootPath, true);
for (String childNodeName : childNodes) { for (String childNodeName : childNodes) {
String childNodePath = String childNodePath =
getNodePath(delegationTokensRootPath, childNodeName); getNodePath(delegationTokensRootPath, childNodeName);
@ -567,7 +567,7 @@ public class ZKRMStateStore extends RMStateStore {
} }
byte[] appStateData = appStateDataPB.getProto().toByteArray(); byte[] appStateData = appStateDataPB.getProto().toByteArray();
if (zkClient.exists(nodeUpdatePath, true) != null) { if (existsWithRetries(nodeUpdatePath, true) != null) {
setDataWithRetries(nodeUpdatePath, appStateData, -1); setDataWithRetries(nodeUpdatePath, appStateData, -1);
} else { } else {
createWithRetries(nodeUpdatePath, appStateData, zkAcl, createWithRetries(nodeUpdatePath, appStateData, zkAcl,
@ -610,7 +610,7 @@ public class ZKRMStateStore extends RMStateStore {
} }
byte[] attemptStateData = attemptStateDataPB.getProto().toByteArray(); byte[] attemptStateData = attemptStateDataPB.getProto().toByteArray();
if (zkClient.exists(nodeUpdatePath, true) != null) { if (existsWithRetries(nodeUpdatePath, true) != null) {
setDataWithRetries(nodeUpdatePath, attemptStateData, -1); setDataWithRetries(nodeUpdatePath, attemptStateData, -1);
} else { } else {
createWithRetries(nodeUpdatePath, attemptStateData, zkAcl, createWithRetries(nodeUpdatePath, attemptStateData, zkAcl,
@ -661,7 +661,7 @@ public class ZKRMStateStore extends RMStateStore {
LOG.debug("Removing RMDelegationToken_" LOG.debug("Removing RMDelegationToken_"
+ rmDTIdentifier.getSequenceNumber()); + rmDTIdentifier.getSequenceNumber());
} }
if (zkClient.exists(nodeRemovePath, true) != null) { if (existsWithRetries(nodeRemovePath, true) != null) {
opList.add(Op.delete(nodeRemovePath, -1)); opList.add(Op.delete(nodeRemovePath, -1));
} else { } else {
LOG.info("Attempted to delete a non-existing znode " + nodeRemovePath); LOG.info("Attempted to delete a non-existing znode " + nodeRemovePath);
@ -677,7 +677,7 @@ public class ZKRMStateStore extends RMStateStore {
String nodeRemovePath = String nodeRemovePath =
getNodePath(delegationTokensRootPath, DELEGATION_TOKEN_PREFIX getNodePath(delegationTokensRootPath, DELEGATION_TOKEN_PREFIX
+ rmDTIdentifier.getSequenceNumber()); + rmDTIdentifier.getSequenceNumber());
if (zkClient.exists(nodeRemovePath, true) == null) { if (existsWithRetries(nodeRemovePath, true) == null) {
// in case znode doesn't exist // in case znode doesn't exist
addStoreOrUpdateOps( addStoreOrUpdateOps(
opList, rmDTIdentifier, renewDate, latestSequenceNumber, false); opList, rmDTIdentifier, renewDate, latestSequenceNumber, false);
@ -760,7 +760,7 @@ public class ZKRMStateStore extends RMStateStore {
if (LOG.isDebugEnabled()) { if (LOG.isDebugEnabled()) {
LOG.debug("Removing RMDelegationKey_" + delegationKey.getKeyId()); LOG.debug("Removing RMDelegationKey_" + delegationKey.getKeyId());
} }
if (zkClient.exists(nodeRemovePath, true) != null) { if (existsWithRetries(nodeRemovePath, true) != null) {
doMultiWithRetries(Op.delete(nodeRemovePath, -1)); doMultiWithRetries(Op.delete(nodeRemovePath, -1));
} else { } else {
LOG.info("Attempted to delete a non-existing znode " + nodeRemovePath); LOG.info("Attempted to delete a non-existing znode " + nodeRemovePath);
@ -891,6 +891,16 @@ public class ZKRMStateStore extends RMStateStore {
}.runWithRetries(); }.runWithRetries();
} }
private List<ACL> getACLWithRetries(
final String path, final Stat stat) throws Exception {
return new ZKAction<List<ACL>>() {
@Override
public List<ACL> run() throws KeeperException, InterruptedException {
return zkClient.getACL(path, stat);
}
}.runWithRetries();
}
private List<String> getChildrenWithRetries( private List<String> getChildrenWithRetries(
final String path, final boolean watch) throws Exception { final String path, final boolean watch) throws Exception {
return new ZKAction<List<String>>() { return new ZKAction<List<String>>() {
@ -901,6 +911,16 @@ public class ZKRMStateStore extends RMStateStore {
}.runWithRetries(); }.runWithRetries();
} }
private Stat existsWithRetries(
final String path, final boolean watch) throws Exception {
return new ZKAction<Stat>() {
@Override
Stat run() throws KeeperException, InterruptedException {
return zkClient.exists(path, watch);
}
}.runWithRetries();
}
/** /**
* Helper class that periodically attempts creating a znode to ensure that * Helper class that periodically attempts creating a znode to ensure that
* this RM continues to be the Active. * this RM continues to be the Active.