YARN-3469. ZKRMStateStore: Avoid setting watches that are not required. (Jun Hong via kasha)

This commit is contained in:
Karthik Kambatla 2015-04-10 11:20:34 -07:00
parent 577d755e4b
commit e516706b89
2 changed files with 23 additions and 20 deletions

View File

@ -118,6 +118,9 @@ Release 2.8.0 - UNRELEASED
YARN-3339. TestDockerContainerExecutor should pull a single image and not
the entire centos repository. (Ravindra Kumar Naik via raviprak)
YARN-3469. ZKRMStateStore: Avoid setting watches that are not required.
(Jun Hong via kasha)
BUG FIXES
YARN-3197. Confusing log generated by CapacityScheduler. (Varun Saxena

View File

@ -391,7 +391,7 @@ public class ZKRMStateStore extends RMStateStore {
String versionNodePath = getNodePath(zkRootNodePath, VERSION_NODE);
byte[] data =
((VersionPBImpl) CURRENT_VERSION_INFO).getProto().toByteArray();
if (existsWithRetries(versionNodePath, true) != null) {
if (existsWithRetries(versionNodePath, false) != null) {
setDataWithRetries(versionNodePath, data, -1);
} else {
createWithRetries(versionNodePath, data, zkAcl, CreateMode.PERSISTENT);
@ -402,8 +402,8 @@ public class ZKRMStateStore extends RMStateStore {
protected synchronized Version loadVersion() throws Exception {
String versionNodePath = getNodePath(zkRootNodePath, VERSION_NODE);
if (existsWithRetries(versionNodePath, true) != null) {
byte[] data = getDataWithRetries(versionNodePath, true);
if (existsWithRetries(versionNodePath, false) != null) {
byte[] data = getDataWithRetries(versionNodePath, false);
Version version =
new VersionPBImpl(VersionProto.parseFrom(data));
return version;
@ -415,9 +415,9 @@ public class ZKRMStateStore extends RMStateStore {
public synchronized long getAndIncrementEpoch() throws Exception {
String epochNodePath = getNodePath(zkRootNodePath, EPOCH_NODE);
long currentEpoch = 0;
if (existsWithRetries(epochNodePath, true) != null) {
if (existsWithRetries(epochNodePath, false) != null) {
// load current epoch
byte[] data = getDataWithRetries(epochNodePath, true);
byte[] data = getDataWithRetries(epochNodePath, false);
Epoch epoch = new EpochPBImpl(EpochProto.parseFrom(data));
currentEpoch = epoch.getEpoch();
// increment epoch and store it
@ -447,7 +447,7 @@ public class ZKRMStateStore extends RMStateStore {
private void loadAMRMTokenSecretManagerState(RMState rmState)
throws Exception {
byte[] data = getDataWithRetries(amrmTokenSecretManagerRoot, true);
byte[] data = getDataWithRetries(amrmTokenSecretManagerRoot, false);
if (data == null) {
LOG.warn("There is no data saved");
return;
@ -470,10 +470,10 @@ public class ZKRMStateStore extends RMStateStore {
private void loadRMDelegationKeyState(RMState rmState) throws Exception {
List<String> childNodes =
getChildrenWithRetries(dtMasterKeysRootPath, true);
getChildrenWithRetries(dtMasterKeysRootPath, false);
for (String childNodeName : childNodes) {
String childNodePath = getNodePath(dtMasterKeysRootPath, childNodeName);
byte[] childData = getDataWithRetries(childNodePath, true);
byte[] childData = getDataWithRetries(childNodePath, false);
if (childData == null) {
LOG.warn("Content of " + childNodePath + " is broken.");
@ -515,11 +515,11 @@ public class ZKRMStateStore extends RMStateStore {
private void loadRMDelegationTokenState(RMState rmState) throws Exception {
List<String> childNodes =
getChildrenWithRetries(delegationTokensRootPath, true);
getChildrenWithRetries(delegationTokensRootPath, false);
for (String childNodeName : childNodes) {
String childNodePath =
getNodePath(delegationTokensRootPath, childNodeName);
byte[] childData = getDataWithRetries(childNodePath, true);
byte[] childData = getDataWithRetries(childNodePath, false);
if (childData == null) {
LOG.warn("Content of " + childNodePath + " is broken.");
@ -551,10 +551,10 @@ public class ZKRMStateStore extends RMStateStore {
}
private synchronized void loadRMAppState(RMState rmState) throws Exception {
List<String> childNodes = getChildrenWithRetries(rmAppRoot, true);
List<String> childNodes = getChildrenWithRetries(rmAppRoot, false);
for (String childNodeName : childNodes) {
String childNodePath = getNodePath(rmAppRoot, childNodeName);
byte[] childData = getDataWithRetries(childNodePath, true);
byte[] childData = getDataWithRetries(childNodePath, false);
if (childNodeName.startsWith(ApplicationId.appIdStrPrefix)) {
// application
if (LOG.isDebugEnabled()) {
@ -585,7 +585,7 @@ public class ZKRMStateStore extends RMStateStore {
for (String attemptIDStr : attempts) {
if (attemptIDStr.startsWith(ApplicationAttemptId.appAttemptIdStrPrefix)) {
String attemptPath = getNodePath(appPath, attemptIDStr);
byte[] attemptData = getDataWithRetries(attemptPath, true);
byte[] attemptData = getDataWithRetries(attemptPath, false);
ApplicationAttemptStateDataPBImpl attemptState =
new ApplicationAttemptStateDataPBImpl(
@ -622,7 +622,7 @@ public class ZKRMStateStore extends RMStateStore {
}
byte[] appStateData = appStateDataPB.getProto().toByteArray();
if (existsWithRetries(nodeUpdatePath, true) != null) {
if (existsWithRetries(nodeUpdatePath, false) != null) {
setDataWithRetries(nodeUpdatePath, appStateData, -1);
} else {
createWithRetries(nodeUpdatePath, appStateData, zkAcl,
@ -665,7 +665,7 @@ public class ZKRMStateStore extends RMStateStore {
}
byte[] attemptStateData = attemptStateDataPB.getProto().toByteArray();
if (existsWithRetries(nodeUpdatePath, true) != null) {
if (existsWithRetries(nodeUpdatePath, false) != null) {
setDataWithRetries(nodeUpdatePath, attemptStateData, -1);
} else {
createWithRetries(nodeUpdatePath, attemptStateData, zkAcl,
@ -717,7 +717,7 @@ public class ZKRMStateStore extends RMStateStore {
LOG.debug("Removing RMDelegationToken_"
+ rmDTIdentifier.getSequenceNumber());
}
if (existsWithRetries(nodeRemovePath, true) != null) {
if (existsWithRetries(nodeRemovePath, false) != null) {
opList.add(Op.delete(nodeRemovePath, -1));
} else {
LOG.debug("Attempted to delete a non-existing znode " + nodeRemovePath);
@ -733,7 +733,7 @@ public class ZKRMStateStore extends RMStateStore {
String nodeRemovePath =
getNodePath(delegationTokensRootPath, DELEGATION_TOKEN_PREFIX
+ rmDTIdentifier.getSequenceNumber());
if (existsWithRetries(nodeRemovePath, true) == null) {
if (existsWithRetries(nodeRemovePath, false) == null) {
// in case znode doesn't exist
addStoreOrUpdateOps(opList, rmDTIdentifier, renewDate, false);
LOG.debug("Attempted to update a non-existing znode " + nodeRemovePath);
@ -809,7 +809,7 @@ public class ZKRMStateStore extends RMStateStore {
if (LOG.isDebugEnabled()) {
LOG.debug("Removing RMDelegationKey_" + delegationKey.getKeyId());
}
if (existsWithRetries(nodeRemovePath, true) != null) {
if (existsWithRetries(nodeRemovePath, false) != null) {
doMultiWithRetries(Op.delete(nodeRemovePath, -1));
} else {
LOG.debug("Attempted to delete a non-existing znode " + nodeRemovePath);
@ -818,8 +818,8 @@ public class ZKRMStateStore extends RMStateStore {
@Override
public synchronized void deleteStore() throws Exception {
if (existsWithRetries(zkRootNodePath, true) != null) {
deleteWithRetries(zkRootNodePath, true);
if (existsWithRetries(zkRootNodePath, false) != null) {
deleteWithRetries(zkRootNodePath, false);
}
}