YARN-3469. ZKRMStateStore: Avoid setting watches that are not required. (Jun Hong via kasha)
(cherry picked from commit e516706b89
)
This commit is contained in:
parent
d3daf9665c
commit
ad99d268e5
|
@ -70,6 +70,9 @@ Release 2.8.0 - UNRELEASED
|
||||||
YARN-3339. TestDockerContainerExecutor should pull a single image and not
|
YARN-3339. TestDockerContainerExecutor should pull a single image and not
|
||||||
the entire centos repository. (Ravindra Kumar Naik via raviprak)
|
the entire centos repository. (Ravindra Kumar Naik via raviprak)
|
||||||
|
|
||||||
|
YARN-3469. ZKRMStateStore: Avoid setting watches that are not required.
|
||||||
|
(Jun Hong via kasha)
|
||||||
|
|
||||||
BUG FIXES
|
BUG FIXES
|
||||||
|
|
||||||
YARN-3197. Confusing log generated by CapacityScheduler. (Varun Saxena
|
YARN-3197. Confusing log generated by CapacityScheduler. (Varun Saxena
|
||||||
|
|
|
@ -391,7 +391,7 @@ public class ZKRMStateStore extends RMStateStore {
|
||||||
String versionNodePath = getNodePath(zkRootNodePath, VERSION_NODE);
|
String versionNodePath = getNodePath(zkRootNodePath, VERSION_NODE);
|
||||||
byte[] data =
|
byte[] data =
|
||||||
((VersionPBImpl) CURRENT_VERSION_INFO).getProto().toByteArray();
|
((VersionPBImpl) CURRENT_VERSION_INFO).getProto().toByteArray();
|
||||||
if (existsWithRetries(versionNodePath, true) != null) {
|
if (existsWithRetries(versionNodePath, false) != null) {
|
||||||
setDataWithRetries(versionNodePath, data, -1);
|
setDataWithRetries(versionNodePath, data, -1);
|
||||||
} else {
|
} else {
|
||||||
createWithRetries(versionNodePath, data, zkAcl, CreateMode.PERSISTENT);
|
createWithRetries(versionNodePath, data, zkAcl, CreateMode.PERSISTENT);
|
||||||
|
@ -402,8 +402,8 @@ public class ZKRMStateStore extends RMStateStore {
|
||||||
protected synchronized Version loadVersion() throws Exception {
|
protected synchronized Version loadVersion() throws Exception {
|
||||||
String versionNodePath = getNodePath(zkRootNodePath, VERSION_NODE);
|
String versionNodePath = getNodePath(zkRootNodePath, VERSION_NODE);
|
||||||
|
|
||||||
if (existsWithRetries(versionNodePath, true) != null) {
|
if (existsWithRetries(versionNodePath, false) != null) {
|
||||||
byte[] data = getDataWithRetries(versionNodePath, true);
|
byte[] data = getDataWithRetries(versionNodePath, false);
|
||||||
Version version =
|
Version version =
|
||||||
new VersionPBImpl(VersionProto.parseFrom(data));
|
new VersionPBImpl(VersionProto.parseFrom(data));
|
||||||
return version;
|
return version;
|
||||||
|
@ -415,9 +415,9 @@ public class ZKRMStateStore extends RMStateStore {
|
||||||
public synchronized long getAndIncrementEpoch() throws Exception {
|
public synchronized long getAndIncrementEpoch() throws Exception {
|
||||||
String epochNodePath = getNodePath(zkRootNodePath, EPOCH_NODE);
|
String epochNodePath = getNodePath(zkRootNodePath, EPOCH_NODE);
|
||||||
long currentEpoch = 0;
|
long currentEpoch = 0;
|
||||||
if (existsWithRetries(epochNodePath, true) != null) {
|
if (existsWithRetries(epochNodePath, false) != null) {
|
||||||
// load current epoch
|
// load current epoch
|
||||||
byte[] data = getDataWithRetries(epochNodePath, true);
|
byte[] data = getDataWithRetries(epochNodePath, false);
|
||||||
Epoch epoch = new EpochPBImpl(EpochProto.parseFrom(data));
|
Epoch epoch = new EpochPBImpl(EpochProto.parseFrom(data));
|
||||||
currentEpoch = epoch.getEpoch();
|
currentEpoch = epoch.getEpoch();
|
||||||
// increment epoch and store it
|
// increment epoch and store it
|
||||||
|
@ -447,7 +447,7 @@ public class ZKRMStateStore extends RMStateStore {
|
||||||
|
|
||||||
private void loadAMRMTokenSecretManagerState(RMState rmState)
|
private void loadAMRMTokenSecretManagerState(RMState rmState)
|
||||||
throws Exception {
|
throws Exception {
|
||||||
byte[] data = getDataWithRetries(amrmTokenSecretManagerRoot, true);
|
byte[] data = getDataWithRetries(amrmTokenSecretManagerRoot, false);
|
||||||
if (data == null) {
|
if (data == null) {
|
||||||
LOG.warn("There is no data saved");
|
LOG.warn("There is no data saved");
|
||||||
return;
|
return;
|
||||||
|
@ -470,10 +470,10 @@ public class ZKRMStateStore extends RMStateStore {
|
||||||
|
|
||||||
private void loadRMDelegationKeyState(RMState rmState) throws Exception {
|
private void loadRMDelegationKeyState(RMState rmState) throws Exception {
|
||||||
List<String> childNodes =
|
List<String> childNodes =
|
||||||
getChildrenWithRetries(dtMasterKeysRootPath, true);
|
getChildrenWithRetries(dtMasterKeysRootPath, false);
|
||||||
for (String childNodeName : childNodes) {
|
for (String childNodeName : childNodes) {
|
||||||
String childNodePath = getNodePath(dtMasterKeysRootPath, childNodeName);
|
String childNodePath = getNodePath(dtMasterKeysRootPath, childNodeName);
|
||||||
byte[] childData = getDataWithRetries(childNodePath, true);
|
byte[] childData = getDataWithRetries(childNodePath, false);
|
||||||
|
|
||||||
if (childData == null) {
|
if (childData == null) {
|
||||||
LOG.warn("Content of " + childNodePath + " is broken.");
|
LOG.warn("Content of " + childNodePath + " is broken.");
|
||||||
|
@ -515,11 +515,11 @@ public class ZKRMStateStore extends RMStateStore {
|
||||||
|
|
||||||
private void loadRMDelegationTokenState(RMState rmState) throws Exception {
|
private void loadRMDelegationTokenState(RMState rmState) throws Exception {
|
||||||
List<String> childNodes =
|
List<String> childNodes =
|
||||||
getChildrenWithRetries(delegationTokensRootPath, true);
|
getChildrenWithRetries(delegationTokensRootPath, false);
|
||||||
for (String childNodeName : childNodes) {
|
for (String childNodeName : childNodes) {
|
||||||
String childNodePath =
|
String childNodePath =
|
||||||
getNodePath(delegationTokensRootPath, childNodeName);
|
getNodePath(delegationTokensRootPath, childNodeName);
|
||||||
byte[] childData = getDataWithRetries(childNodePath, true);
|
byte[] childData = getDataWithRetries(childNodePath, false);
|
||||||
|
|
||||||
if (childData == null) {
|
if (childData == null) {
|
||||||
LOG.warn("Content of " + childNodePath + " is broken.");
|
LOG.warn("Content of " + childNodePath + " is broken.");
|
||||||
|
@ -551,10 +551,10 @@ public class ZKRMStateStore extends RMStateStore {
|
||||||
}
|
}
|
||||||
|
|
||||||
private synchronized void loadRMAppState(RMState rmState) throws Exception {
|
private synchronized void loadRMAppState(RMState rmState) throws Exception {
|
||||||
List<String> childNodes = getChildrenWithRetries(rmAppRoot, true);
|
List<String> childNodes = getChildrenWithRetries(rmAppRoot, false);
|
||||||
for (String childNodeName : childNodes) {
|
for (String childNodeName : childNodes) {
|
||||||
String childNodePath = getNodePath(rmAppRoot, childNodeName);
|
String childNodePath = getNodePath(rmAppRoot, childNodeName);
|
||||||
byte[] childData = getDataWithRetries(childNodePath, true);
|
byte[] childData = getDataWithRetries(childNodePath, false);
|
||||||
if (childNodeName.startsWith(ApplicationId.appIdStrPrefix)) {
|
if (childNodeName.startsWith(ApplicationId.appIdStrPrefix)) {
|
||||||
// application
|
// application
|
||||||
if (LOG.isDebugEnabled()) {
|
if (LOG.isDebugEnabled()) {
|
||||||
|
@ -585,7 +585,7 @@ public class ZKRMStateStore extends RMStateStore {
|
||||||
for (String attemptIDStr : attempts) {
|
for (String attemptIDStr : attempts) {
|
||||||
if (attemptIDStr.startsWith(ApplicationAttemptId.appAttemptIdStrPrefix)) {
|
if (attemptIDStr.startsWith(ApplicationAttemptId.appAttemptIdStrPrefix)) {
|
||||||
String attemptPath = getNodePath(appPath, attemptIDStr);
|
String attemptPath = getNodePath(appPath, attemptIDStr);
|
||||||
byte[] attemptData = getDataWithRetries(attemptPath, true);
|
byte[] attemptData = getDataWithRetries(attemptPath, false);
|
||||||
|
|
||||||
ApplicationAttemptStateDataPBImpl attemptState =
|
ApplicationAttemptStateDataPBImpl attemptState =
|
||||||
new ApplicationAttemptStateDataPBImpl(
|
new ApplicationAttemptStateDataPBImpl(
|
||||||
|
@ -622,7 +622,7 @@ public class ZKRMStateStore extends RMStateStore {
|
||||||
}
|
}
|
||||||
byte[] appStateData = appStateDataPB.getProto().toByteArray();
|
byte[] appStateData = appStateDataPB.getProto().toByteArray();
|
||||||
|
|
||||||
if (existsWithRetries(nodeUpdatePath, true) != null) {
|
if (existsWithRetries(nodeUpdatePath, false) != null) {
|
||||||
setDataWithRetries(nodeUpdatePath, appStateData, -1);
|
setDataWithRetries(nodeUpdatePath, appStateData, -1);
|
||||||
} else {
|
} else {
|
||||||
createWithRetries(nodeUpdatePath, appStateData, zkAcl,
|
createWithRetries(nodeUpdatePath, appStateData, zkAcl,
|
||||||
|
@ -665,7 +665,7 @@ public class ZKRMStateStore extends RMStateStore {
|
||||||
}
|
}
|
||||||
byte[] attemptStateData = attemptStateDataPB.getProto().toByteArray();
|
byte[] attemptStateData = attemptStateDataPB.getProto().toByteArray();
|
||||||
|
|
||||||
if (existsWithRetries(nodeUpdatePath, true) != null) {
|
if (existsWithRetries(nodeUpdatePath, false) != null) {
|
||||||
setDataWithRetries(nodeUpdatePath, attemptStateData, -1);
|
setDataWithRetries(nodeUpdatePath, attemptStateData, -1);
|
||||||
} else {
|
} else {
|
||||||
createWithRetries(nodeUpdatePath, attemptStateData, zkAcl,
|
createWithRetries(nodeUpdatePath, attemptStateData, zkAcl,
|
||||||
|
@ -717,7 +717,7 @@ public class ZKRMStateStore extends RMStateStore {
|
||||||
LOG.debug("Removing RMDelegationToken_"
|
LOG.debug("Removing RMDelegationToken_"
|
||||||
+ rmDTIdentifier.getSequenceNumber());
|
+ rmDTIdentifier.getSequenceNumber());
|
||||||
}
|
}
|
||||||
if (existsWithRetries(nodeRemovePath, true) != null) {
|
if (existsWithRetries(nodeRemovePath, false) != null) {
|
||||||
opList.add(Op.delete(nodeRemovePath, -1));
|
opList.add(Op.delete(nodeRemovePath, -1));
|
||||||
} else {
|
} else {
|
||||||
LOG.debug("Attempted to delete a non-existing znode " + nodeRemovePath);
|
LOG.debug("Attempted to delete a non-existing znode " + nodeRemovePath);
|
||||||
|
@ -733,7 +733,7 @@ public class ZKRMStateStore extends RMStateStore {
|
||||||
String nodeRemovePath =
|
String nodeRemovePath =
|
||||||
getNodePath(delegationTokensRootPath, DELEGATION_TOKEN_PREFIX
|
getNodePath(delegationTokensRootPath, DELEGATION_TOKEN_PREFIX
|
||||||
+ rmDTIdentifier.getSequenceNumber());
|
+ rmDTIdentifier.getSequenceNumber());
|
||||||
if (existsWithRetries(nodeRemovePath, true) == null) {
|
if (existsWithRetries(nodeRemovePath, false) == null) {
|
||||||
// in case znode doesn't exist
|
// in case znode doesn't exist
|
||||||
addStoreOrUpdateOps(opList, rmDTIdentifier, renewDate, false);
|
addStoreOrUpdateOps(opList, rmDTIdentifier, renewDate, false);
|
||||||
LOG.debug("Attempted to update a non-existing znode " + nodeRemovePath);
|
LOG.debug("Attempted to update a non-existing znode " + nodeRemovePath);
|
||||||
|
@ -809,7 +809,7 @@ public class ZKRMStateStore extends RMStateStore {
|
||||||
if (LOG.isDebugEnabled()) {
|
if (LOG.isDebugEnabled()) {
|
||||||
LOG.debug("Removing RMDelegationKey_" + delegationKey.getKeyId());
|
LOG.debug("Removing RMDelegationKey_" + delegationKey.getKeyId());
|
||||||
}
|
}
|
||||||
if (existsWithRetries(nodeRemovePath, true) != null) {
|
if (existsWithRetries(nodeRemovePath, false) != null) {
|
||||||
doMultiWithRetries(Op.delete(nodeRemovePath, -1));
|
doMultiWithRetries(Op.delete(nodeRemovePath, -1));
|
||||||
} else {
|
} else {
|
||||||
LOG.debug("Attempted to delete a non-existing znode " + nodeRemovePath);
|
LOG.debug("Attempted to delete a non-existing znode " + nodeRemovePath);
|
||||||
|
@ -818,8 +818,8 @@ public class ZKRMStateStore extends RMStateStore {
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public synchronized void deleteStore() throws Exception {
|
public synchronized void deleteStore() throws Exception {
|
||||||
if (existsWithRetries(zkRootNodePath, true) != null) {
|
if (existsWithRetries(zkRootNodePath, false) != null) {
|
||||||
deleteWithRetries(zkRootNodePath, true);
|
deleteWithRetries(zkRootNodePath, false);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue