From fbd4bbb07bd995c8e871fca59bc40df02f555e0e Mon Sep 17 00:00:00 2001 From: Karthik Kambatla Date: Fri, 10 Apr 2015 11:20:34 -0700 Subject: [PATCH] YARN-3469. ZKRMStateStore: Avoid setting watches that are not required. (Jun Hong via kasha) (cherry picked from commit e516706b896743e47e2852be81944eb5613e3e76) --- hadoop-yarn-project/CHANGES.txt | 3 ++ .../recovery/ZKRMStateStore.java | 40 +++++++++---------- 2 files changed, 23 insertions(+), 20 deletions(-) diff --git a/hadoop-yarn-project/CHANGES.txt b/hadoop-yarn-project/CHANGES.txt index 7a178bee535..4d3fc5332dc 100644 --- a/hadoop-yarn-project/CHANGES.txt +++ b/hadoop-yarn-project/CHANGES.txt @@ -16,6 +16,9 @@ Release 2.7.1 - UNRELEASED YARN-3006. Improve the error message when attempting manual failover with auto-failover enabled. (Akira AJISAKA via wangda) + YARN-3469. ZKRMStateStore: Avoid setting watches that are not required. + (Jun Hong via kasha) + BUG FIXES YARN-3487. CapacityScheduler scheduler lock obtained unnecessarily when diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/ZKRMStateStore.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/ZKRMStateStore.java index 614ef15e43e..e8891a2233d 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/ZKRMStateStore.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/ZKRMStateStore.java @@ -391,7 +391,7 @@ protected synchronized void storeVersion() throws Exception { String versionNodePath = getNodePath(zkRootNodePath, VERSION_NODE); byte[] data = ((VersionPBImpl) CURRENT_VERSION_INFO).getProto().toByteArray(); - if (existsWithRetries(versionNodePath, true) != null) { + if (existsWithRetries(versionNodePath, false) != null) { setDataWithRetries(versionNodePath, data, -1); } else { createWithRetries(versionNodePath, data, zkAcl, CreateMode.PERSISTENT); @@ -402,8 +402,8 @@ protected synchronized void storeVersion() throws Exception { protected synchronized Version loadVersion() throws Exception { String versionNodePath = getNodePath(zkRootNodePath, VERSION_NODE); - if (existsWithRetries(versionNodePath, true) != null) { - byte[] data = getDataWithRetries(versionNodePath, true); + if (existsWithRetries(versionNodePath, false) != null) { + byte[] data = getDataWithRetries(versionNodePath, false); Version version = new VersionPBImpl(VersionProto.parseFrom(data)); return version; @@ -415,9 +415,9 @@ protected synchronized Version loadVersion() throws Exception { public synchronized long getAndIncrementEpoch() throws Exception { String epochNodePath = getNodePath(zkRootNodePath, EPOCH_NODE); long currentEpoch = 0; - if (existsWithRetries(epochNodePath, true) != null) { + if (existsWithRetries(epochNodePath, false) != null) { // load current epoch - byte[] data = getDataWithRetries(epochNodePath, true); + byte[] data = getDataWithRetries(epochNodePath, false); Epoch epoch = new EpochPBImpl(EpochProto.parseFrom(data)); currentEpoch = epoch.getEpoch(); // increment epoch and store it @@ -447,7 +447,7 @@ public synchronized RMState loadState() throws Exception { private void loadAMRMTokenSecretManagerState(RMState rmState) throws Exception { - byte[] data = getDataWithRetries(amrmTokenSecretManagerRoot, true); + byte[] data = getDataWithRetries(amrmTokenSecretManagerRoot, false); if (data == null) { LOG.warn("There is no data saved"); return; @@ -470,10 +470,10 @@ private synchronized void loadRMDTSecretManagerState(RMState rmState) private void loadRMDelegationKeyState(RMState rmState) throws Exception { List childNodes = - getChildrenWithRetries(dtMasterKeysRootPath, true); + getChildrenWithRetries(dtMasterKeysRootPath, false); for (String childNodeName : childNodes) { String childNodePath = getNodePath(dtMasterKeysRootPath, childNodeName); - byte[] childData = getDataWithRetries(childNodePath, true); + byte[] childData = getDataWithRetries(childNodePath, false); if (childData == null) { LOG.warn("Content of " + childNodePath + " is broken."); @@ -515,11 +515,11 @@ private void loadRMSequentialNumberState(RMState rmState) throws Exception { private void loadRMDelegationTokenState(RMState rmState) throws Exception { List childNodes = - getChildrenWithRetries(delegationTokensRootPath, true); + getChildrenWithRetries(delegationTokensRootPath, false); for (String childNodeName : childNodes) { String childNodePath = getNodePath(delegationTokensRootPath, childNodeName); - byte[] childData = getDataWithRetries(childNodePath, true); + byte[] childData = getDataWithRetries(childNodePath, false); if (childData == null) { LOG.warn("Content of " + childNodePath + " is broken."); @@ -551,10 +551,10 @@ private void loadRMDelegationTokenState(RMState rmState) throws Exception { } private synchronized void loadRMAppState(RMState rmState) throws Exception { - List childNodes = getChildrenWithRetries(rmAppRoot, true); + List childNodes = getChildrenWithRetries(rmAppRoot, false); for (String childNodeName : childNodes) { String childNodePath = getNodePath(rmAppRoot, childNodeName); - byte[] childData = getDataWithRetries(childNodePath, true); + byte[] childData = getDataWithRetries(childNodePath, false); if (childNodeName.startsWith(ApplicationId.appIdStrPrefix)) { // application if (LOG.isDebugEnabled()) { @@ -585,7 +585,7 @@ private void loadApplicationAttemptState(ApplicationStateData appState, for (String attemptIDStr : attempts) { if (attemptIDStr.startsWith(ApplicationAttemptId.appAttemptIdStrPrefix)) { String attemptPath = getNodePath(appPath, attemptIDStr); - byte[] attemptData = getDataWithRetries(attemptPath, true); + byte[] attemptData = getDataWithRetries(attemptPath, false); ApplicationAttemptStateDataPBImpl attemptState = new ApplicationAttemptStateDataPBImpl( @@ -622,7 +622,7 @@ public synchronized void updateApplicationStateInternal(ApplicationId appId, } byte[] appStateData = appStateDataPB.getProto().toByteArray(); - if (existsWithRetries(nodeUpdatePath, true) != null) { + if (existsWithRetries(nodeUpdatePath, false) != null) { setDataWithRetries(nodeUpdatePath, appStateData, -1); } else { createWithRetries(nodeUpdatePath, appStateData, zkAcl, @@ -665,7 +665,7 @@ public synchronized void updateApplicationAttemptStateInternal( } byte[] attemptStateData = attemptStateDataPB.getProto().toByteArray(); - if (existsWithRetries(nodeUpdatePath, true) != null) { + if (existsWithRetries(nodeUpdatePath, false) != null) { setDataWithRetries(nodeUpdatePath, attemptStateData, -1); } else { createWithRetries(nodeUpdatePath, attemptStateData, zkAcl, @@ -717,7 +717,7 @@ protected synchronized void removeRMDelegationTokenState( LOG.debug("Removing RMDelegationToken_" + rmDTIdentifier.getSequenceNumber()); } - if (existsWithRetries(nodeRemovePath, true) != null) { + if (existsWithRetries(nodeRemovePath, false) != null) { opList.add(Op.delete(nodeRemovePath, -1)); } else { LOG.debug("Attempted to delete a non-existing znode " + nodeRemovePath); @@ -733,7 +733,7 @@ protected synchronized void updateRMDelegationTokenState( String nodeRemovePath = getNodePath(delegationTokensRootPath, DELEGATION_TOKEN_PREFIX + rmDTIdentifier.getSequenceNumber()); - if (existsWithRetries(nodeRemovePath, true) == null) { + if (existsWithRetries(nodeRemovePath, false) == null) { // in case znode doesn't exist addStoreOrUpdateOps(opList, rmDTIdentifier, renewDate, false); LOG.debug("Attempted to update a non-existing znode " + nodeRemovePath); @@ -809,7 +809,7 @@ protected synchronized void removeRMDTMasterKeyState( if (LOG.isDebugEnabled()) { LOG.debug("Removing RMDelegationKey_" + delegationKey.getKeyId()); } - if (existsWithRetries(nodeRemovePath, true) != null) { + if (existsWithRetries(nodeRemovePath, false) != null) { doMultiWithRetries(Op.delete(nodeRemovePath, -1)); } else { LOG.debug("Attempted to delete a non-existing znode " + nodeRemovePath); @@ -818,8 +818,8 @@ protected synchronized void removeRMDTMasterKeyState( @Override public synchronized void deleteStore() throws Exception { - if (existsWithRetries(zkRootNodePath, true) != null) { - deleteWithRetries(zkRootNodePath, true); + if (existsWithRetries(zkRootNodePath, false) != null) { + deleteWithRetries(zkRootNodePath, false); } }