diff --git a/hadoop-yarn-project/CHANGES.txt b/hadoop-yarn-project/CHANGES.txt index 36e4bd46d43..68e4dd64729 100644 --- a/hadoop-yarn-project/CHANGES.txt +++ b/hadoop-yarn-project/CHANGES.txt @@ -111,6 +111,9 @@ Release 2.4.1 - UNRELEASED YARN-1928. Fixed a race condition in TestAMRMRPCNodeUpdates which caused it to fail occassionally. (Zhijie Shen via vinodkv) + YARN-1934. Fixed a potential NPE in ZKRMStateStore caused by handling + Disconnected event from ZK. (Karthik Kambatla via jianhe) + Release 2.4.0 - 2014-04-07 INCOMPATIBLE CHANGES diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/ZKRMStateStore.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/ZKRMStateStore.java index 84eea39d21c..9b15bb21e7e 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/ZKRMStateStore.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/ZKRMStateStore.java @@ -280,10 +280,9 @@ public class ZKRMStateStore extends RMStateStore { } } - private void logRootNodeAcls(String prefix) throws KeeperException, - InterruptedException { + private void logRootNodeAcls(String prefix) throws Exception { Stat getStat = new Stat(); - List getAcls = zkClient.getACL(zkRootNodePath, getStat); + List getAcls = getACLWithRetries(zkRootNodePath, getStat); StringBuilder builder = new StringBuilder(); builder.append(prefix); @@ -363,7 +362,7 @@ public class ZKRMStateStore extends RMStateStore { String versionNodePath = getNodePath(zkRootNodePath, VERSION_NODE); byte[] data = ((RMStateVersionPBImpl) CURRENT_VERSION_INFO).getProto().toByteArray(); - if (zkClient.exists(versionNodePath, true) != null) { + if (existsWithRetries(versionNodePath, true) != null) { setDataWithRetries(versionNodePath, data, -1); } else { createWithRetries(versionNodePath, data, zkAcl, CreateMode.PERSISTENT); @@ -374,7 +373,7 @@ public class ZKRMStateStore extends RMStateStore { protected synchronized RMStateVersion loadVersion() throws Exception { String versionNodePath = getNodePath(zkRootNodePath, VERSION_NODE); - if (zkClient.exists(versionNodePath, true) != null) { + if (existsWithRetries(versionNodePath, true) != null) { byte[] data = getDataWithRetries(versionNodePath, true); RMStateVersion version = new RMStateVersionPBImpl(RMStateVersionProto.parseFrom(data)); @@ -442,7 +441,8 @@ public class ZKRMStateStore extends RMStateStore { } private void loadRMDelegationTokenState(RMState rmState) throws Exception { - List childNodes = zkClient.getChildren(delegationTokensRootPath, true); + List childNodes = + getChildrenWithRetries(delegationTokensRootPath, true); for (String childNodeName : childNodes) { String childNodePath = getNodePath(delegationTokensRootPath, childNodeName); @@ -567,7 +567,7 @@ public class ZKRMStateStore extends RMStateStore { } byte[] appStateData = appStateDataPB.getProto().toByteArray(); - if (zkClient.exists(nodeUpdatePath, true) != null) { + if (existsWithRetries(nodeUpdatePath, true) != null) { setDataWithRetries(nodeUpdatePath, appStateData, -1); } else { createWithRetries(nodeUpdatePath, appStateData, zkAcl, @@ -610,7 +610,7 @@ public class ZKRMStateStore extends RMStateStore { } byte[] attemptStateData = attemptStateDataPB.getProto().toByteArray(); - if (zkClient.exists(nodeUpdatePath, true) != null) { + if (existsWithRetries(nodeUpdatePath, true) != null) { setDataWithRetries(nodeUpdatePath, attemptStateData, -1); } else { createWithRetries(nodeUpdatePath, attemptStateData, zkAcl, @@ -661,7 +661,7 @@ public class ZKRMStateStore extends RMStateStore { LOG.debug("Removing RMDelegationToken_" + rmDTIdentifier.getSequenceNumber()); } - if (zkClient.exists(nodeRemovePath, true) != null) { + if (existsWithRetries(nodeRemovePath, true) != null) { opList.add(Op.delete(nodeRemovePath, -1)); } else { LOG.info("Attempted to delete a non-existing znode " + nodeRemovePath); @@ -677,7 +677,7 @@ public class ZKRMStateStore extends RMStateStore { String nodeRemovePath = getNodePath(delegationTokensRootPath, DELEGATION_TOKEN_PREFIX + rmDTIdentifier.getSequenceNumber()); - if (zkClient.exists(nodeRemovePath, true) == null) { + if (existsWithRetries(nodeRemovePath, true) == null) { // in case znode doesn't exist addStoreOrUpdateOps( opList, rmDTIdentifier, renewDate, latestSequenceNumber, false); @@ -760,7 +760,7 @@ public class ZKRMStateStore extends RMStateStore { if (LOG.isDebugEnabled()) { LOG.debug("Removing RMDelegationKey_" + delegationKey.getKeyId()); } - if (zkClient.exists(nodeRemovePath, true) != null) { + if (existsWithRetries(nodeRemovePath, true) != null) { doMultiWithRetries(Op.delete(nodeRemovePath, -1)); } else { LOG.info("Attempted to delete a non-existing znode " + nodeRemovePath); @@ -891,6 +891,16 @@ public class ZKRMStateStore extends RMStateStore { }.runWithRetries(); } + private List getACLWithRetries( + final String path, final Stat stat) throws Exception { + return new ZKAction>() { + @Override + public List run() throws KeeperException, InterruptedException { + return zkClient.getACL(path, stat); + } + }.runWithRetries(); + } + private List getChildrenWithRetries( final String path, final boolean watch) throws Exception { return new ZKAction>() { @@ -901,6 +911,16 @@ public class ZKRMStateStore extends RMStateStore { }.runWithRetries(); } + private Stat existsWithRetries( + final String path, final boolean watch) throws Exception { + return new ZKAction() { + @Override + Stat run() throws KeeperException, InterruptedException { + return zkClient.exists(path, watch); + } + }.runWithRetries(); + } + /** * Helper class that periodically attempts creating a znode to ensure that * this RM continues to be the Active.