YARN-5400. Light cleanup in ZKRMStateStore (templedf via rkanter)

(cherry picked from commit bcb2528a51)
This commit is contained in:
Robert Kanter 2016-09-28 14:56:41 -07:00
parent baf8aac05c
commit bc91e33d5e
3 changed files with 161 additions and 120 deletions

View File

@ -18,6 +18,7 @@
package org.apache.hadoop.yarn.server.resourcemanager; package org.apache.hadoop.yarn.server.resourcemanager;
import java.io.IOException;
import org.apache.commons.logging.Log; import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory; import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceAudience;
@ -37,9 +38,12 @@ public class RMZKUtils {
private static final Log LOG = LogFactory.getLog(RMZKUtils.class); private static final Log LOG = LogFactory.getLog(RMZKUtils.class);
/** /**
* Utility method to fetch the ZK ACLs from the configuration * Utility method to fetch the ZK ACLs from the configuration.
*
* @throws java.io.IOException if the Zookeeper ACLs configuration file
* cannot be read
*/ */
public static List<ACL> getZKAcls(Configuration conf) throws Exception { public static List<ACL> getZKAcls(Configuration conf) throws IOException {
// Parse authentication from configuration. // Parse authentication from configuration.
String zkAclConf = String zkAclConf =
conf.get(YarnConfiguration.RM_ZK_ACL, conf.get(YarnConfiguration.RM_ZK_ACL,
@ -47,17 +51,20 @@ public class RMZKUtils {
try { try {
zkAclConf = ZKUtil.resolveConfIndirection(zkAclConf); zkAclConf = ZKUtil.resolveConfIndirection(zkAclConf);
return ZKUtil.parseACLs(zkAclConf); return ZKUtil.parseACLs(zkAclConf);
} catch (Exception e) { } catch (IOException | ZKUtil.BadAclFormatException e) {
LOG.error("Couldn't read ACLs based on " + YarnConfiguration.RM_ZK_ACL); LOG.error("Couldn't read ACLs based on " + YarnConfiguration.RM_ZK_ACL);
throw e; throw e;
} }
} }
/** /**
* Utility method to fetch ZK auth info from the configuration * Utility method to fetch ZK auth info from the configuration.
*
* @throws java.io.IOException if the Zookeeper ACLs configuration file
* cannot be read
*/ */
public static List<ZKUtil.ZKAuthInfo> getZKAuths(Configuration conf) public static List<ZKUtil.ZKAuthInfo> getZKAuths(Configuration conf)
throws Exception { throws IOException {
String zkAuthConf = conf.get(YarnConfiguration.RM_ZK_AUTH); String zkAuthConf = conf.get(YarnConfiguration.RM_ZK_AUTH);
try { try {
zkAuthConf = ZKUtil.resolveConfIndirection(zkAuthConf); zkAuthConf = ZKUtil.resolveConfIndirection(zkAuthConf);
@ -66,7 +73,7 @@ public class RMZKUtils {
} else { } else {
return Collections.emptyList(); return Collections.emptyList();
} }
} catch (Exception e) { } catch (IOException | ZKUtil.BadAuthFormatException e) {
LOG.error("Couldn't read Auth based on " + YarnConfiguration.RM_ZK_AUTH); LOG.error("Couldn't read Auth based on " + YarnConfiguration.RM_ZK_AUTH);
throw e; throw e;
} }

View File

@ -305,7 +305,7 @@ public class ResourceManager extends CompositeService implements Recoverable {
} }
public CuratorFramework createAndStartCurator(Configuration conf) public CuratorFramework createAndStartCurator(Configuration conf)
throws Exception { throws IOException {
String zkHostPort = conf.get(YarnConfiguration.RM_ZK_ADDRESS); String zkHostPort = conf.get(YarnConfiguration.RM_ZK_ADDRESS);
if (zkHostPort == null) { if (zkHostPort == null) {
throw new YarnRuntimeException( throw new YarnRuntimeException(

View File

@ -56,7 +56,6 @@ import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.impl.pb.AM
import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.impl.pb.ApplicationAttemptStateDataPBImpl; import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.impl.pb.ApplicationAttemptStateDataPBImpl;
import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.impl.pb.ApplicationStateDataPBImpl; import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.impl.pb.ApplicationStateDataPBImpl;
import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.impl.pb.EpochPBImpl; import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.impl.pb.EpochPBImpl;
import org.apache.hadoop.yarn.util.ConverterUtils;
import org.apache.zookeeper.CreateMode; import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.ZooDefs; import org.apache.zookeeper.ZooDefs;
import org.apache.zookeeper.data.ACL; import org.apache.zookeeper.data.ACL;
@ -68,8 +67,8 @@ import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream; import java.io.ByteArrayOutputStream;
import java.io.DataInputStream; import java.io.DataInputStream;
import java.io.DataOutputStream; import java.io.DataOutputStream;
import java.io.IOException;
import java.security.NoSuchAlgorithmException; import java.security.NoSuchAlgorithmException;
import java.security.SecureRandom;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.HashMap; import java.util.HashMap;
import java.util.List; import java.util.List;
@ -121,21 +120,18 @@ import java.util.List;
@Private @Private
@Unstable @Unstable
public class ZKRMStateStore extends RMStateStore { public class ZKRMStateStore extends RMStateStore {
private static final Log LOG = LogFactory.getLog(ZKRMStateStore.class);
public static final Log LOG = LogFactory.getLog(ZKRMStateStore.class);
private final SecureRandom random = new SecureRandom();
protected static final String ROOT_ZNODE_NAME = "ZKRMStateRoot";
protected static final Version CURRENT_VERSION_INFO = Version
.newInstance(1, 3);
private static final String RM_DELEGATION_TOKENS_ROOT_ZNODE_NAME = private static final String RM_DELEGATION_TOKENS_ROOT_ZNODE_NAME =
"RMDelegationTokensRoot"; "RMDelegationTokensRoot";
private static final String RM_DT_SEQUENTIAL_NUMBER_ZNODE_NAME = private static final String RM_DT_SEQUENTIAL_NUMBER_ZNODE_NAME =
"RMDTSequentialNumber"; "RMDTSequentialNumber";
private static final String RM_DT_MASTER_KEYS_ROOT_ZNODE_NAME = private static final String RM_DT_MASTER_KEYS_ROOT_ZNODE_NAME =
"RMDTMasterKeysRoot"; "RMDTMasterKeysRoot";
protected static final String ROOT_ZNODE_NAME = "ZKRMStateRoot";
protected static final Version CURRENT_VERSION_INFO =
Version.newInstance(1, 3);
/** Znode paths */ /* Znode paths */
private String zkRootNodePath; private String zkRootNodePath;
private String rmAppRoot; private String rmAppRoot;
private String rmDTSecretManagerRoot; private String rmDTSecretManagerRoot;
@ -144,44 +140,54 @@ public class ZKRMStateStore extends RMStateStore {
private String dtSequenceNumberPath; private String dtSequenceNumberPath;
private String amrmTokenSecretManagerRoot; private String amrmTokenSecretManagerRoot;
private String reservationRoot; private String reservationRoot;
@VisibleForTesting @VisibleForTesting
protected String znodeWorkingPath; protected String znodeWorkingPath;
/** Fencing related variables */ /* Fencing related variables */
private static final String FENCING_LOCK = "RM_ZK_FENCING_LOCK"; private static final String FENCING_LOCK = "RM_ZK_FENCING_LOCK";
private String fencingNodePath; private String fencingNodePath;
private Thread verifyActiveStatusThread; private Thread verifyActiveStatusThread;
private int zkSessionTimeout; private int zkSessionTimeout;
/** ACL and auth info */ /* ACL and auth info */
private List<ACL> zkAcl; private List<ACL> zkAcl;
@VisibleForTesting @VisibleForTesting
List<ACL> zkRootNodeAcl; List<ACL> zkRootNodeAcl;
private String zkRootNodeUsername; private String zkRootNodeUsername;
public static final int CREATE_DELETE_PERMS =
private static final int CREATE_DELETE_PERMS =
ZooDefs.Perms.CREATE | ZooDefs.Perms.DELETE; ZooDefs.Perms.CREATE | ZooDefs.Perms.DELETE;
private final String zkRootNodeAuthScheme = private final String zkRootNodeAuthScheme =
new DigestAuthenticationProvider().getScheme(); new DigestAuthenticationProvider().getScheme();
@VisibleForTesting @VisibleForTesting
protected CuratorFramework curatorFramework; protected CuratorFramework curatorFramework;
/** /**
* Given the {@link Configuration} and {@link ACL}s used (zkAcl) for * Given the {@link Configuration} and {@link ACL}s used (sourceACLs) for
* ZooKeeper access, construct the {@link ACL}s for the store's root node. * ZooKeeper access, construct the {@link ACL}s for the store's root node.
* In the constructed {@link ACL}, all the users allowed by zkAcl are given * In the constructed {@link ACL}, all the users allowed by sourceACLs are
* rwa access, while the current RM has exclude create-delete access. * given read-write-admin access, while the current RM has exclusive
* create-delete access.
* *
* To be called only when HA is enabled and the configuration doesn't set ACL * To be called only when HA is enabled and the configuration doesn't set an
* for the root node. * ACL for the root node.
* @param conf the configuration
* @param sourceACLs the source ACLs
* @return ACLs for the store's root node
* @throws java.security.NoSuchAlgorithmException thrown if the digest
* algorithm used by Zookeeper cannot be found
*/ */
@VisibleForTesting @VisibleForTesting
@Private @Private
@Unstable @Unstable
protected List<ACL> constructZkRootNodeACL( protected List<ACL> constructZkRootNodeACL(Configuration conf,
Configuration conf, List<ACL> sourceACLs) throws NoSuchAlgorithmException { List<ACL> sourceACLs) throws NoSuchAlgorithmException {
List<ACL> zkRootNodeAcl = new ArrayList<>(); List<ACL> zkRootNodeAclList = new ArrayList<>();
for (ACL acl : sourceACLs) { for (ACL acl : sourceACLs) {
zkRootNodeAcl.add(new ACL( zkRootNodeAclList.add(new ACL(
ZKUtil.removeSpecificPerms(acl.getPerms(), CREATE_DELETE_PERMS), ZKUtil.removeSpecificPerms(acl.getPerms(), CREATE_DELETE_PERMS),
acl.getId())); acl.getId()));
} }
@ -190,15 +196,16 @@ public class ZKRMStateStore extends RMStateStore {
YarnConfiguration.RM_ADDRESS, YarnConfiguration.RM_ADDRESS,
YarnConfiguration.DEFAULT_RM_ADDRESS, conf); YarnConfiguration.DEFAULT_RM_ADDRESS, conf);
Id rmId = new Id(zkRootNodeAuthScheme, Id rmId = new Id(zkRootNodeAuthScheme,
DigestAuthenticationProvider.generateDigest( DigestAuthenticationProvider.generateDigest(zkRootNodeUsername + ":"
zkRootNodeUsername + ":" + resourceManager.getZkRootNodePassword())); + resourceManager.getZkRootNodePassword()));
zkRootNodeAcl.add(new ACL(CREATE_DELETE_PERMS, rmId)); zkRootNodeAclList.add(new ACL(CREATE_DELETE_PERMS, rmId));
return zkRootNodeAcl;
return zkRootNodeAclList;
} }
@Override @Override
public synchronized void initInternal(Configuration conf) throws Exception { public synchronized void initInternal(Configuration conf)
throws IOException, NoSuchAlgorithmException {
/* Initialize fencing related paths, acls, and ops */ /* Initialize fencing related paths, acls, and ops */
znodeWorkingPath = znodeWorkingPath =
conf.get(YarnConfiguration.ZK_RM_STATE_STORE_PARENT_PATH, conf.get(YarnConfiguration.ZK_RM_STATE_STORE_PARENT_PATH,
@ -210,16 +217,19 @@ public class ZKRMStateStore extends RMStateStore {
YarnConfiguration.DEFAULT_RM_ZK_TIMEOUT_MS); YarnConfiguration.DEFAULT_RM_ZK_TIMEOUT_MS);
zkAcl = RMZKUtils.getZKAcls(conf); zkAcl = RMZKUtils.getZKAcls(conf);
if (HAUtil.isHAEnabled(conf)) { if (HAUtil.isHAEnabled(conf)) {
String zkRootNodeAclConf = HAUtil.getConfValueForRMInstance String zkRootNodeAclConf = HAUtil.getConfValueForRMInstance
(YarnConfiguration.ZK_RM_STATE_STORE_ROOT_NODE_ACL, conf); (YarnConfiguration.ZK_RM_STATE_STORE_ROOT_NODE_ACL, conf);
if (zkRootNodeAclConf != null) { if (zkRootNodeAclConf != null) {
zkRootNodeAclConf = ZKUtil.resolveConfIndirection(zkRootNodeAclConf); zkRootNodeAclConf = ZKUtil.resolveConfIndirection(zkRootNodeAclConf);
try { try {
zkRootNodeAcl = ZKUtil.parseACLs(zkRootNodeAclConf); zkRootNodeAcl = ZKUtil.parseACLs(zkRootNodeAclConf);
} catch (ZKUtil.BadAclFormatException bafe) { } catch (ZKUtil.BadAclFormatException bafe) {
LOG.error("Invalid format for " + LOG.error("Invalid format for "
YarnConfiguration.ZK_RM_STATE_STORE_ROOT_NODE_ACL); + YarnConfiguration.ZK_RM_STATE_STORE_ROOT_NODE_ACL);
throw bafe; throw bafe;
} }
} else { } else {
@ -239,6 +249,7 @@ public class ZKRMStateStore extends RMStateStore {
getNodePath(zkRootNodePath, AMRMTOKEN_SECRET_MANAGER_ROOT); getNodePath(zkRootNodePath, AMRMTOKEN_SECRET_MANAGER_ROOT);
reservationRoot = getNodePath(zkRootNodePath, RESERVATION_SYSTEM_ROOT); reservationRoot = getNodePath(zkRootNodePath, RESERVATION_SYSTEM_ROOT);
curatorFramework = resourceManager.getCurator(); curatorFramework = resourceManager.getCurator();
if (curatorFramework == null) { if (curatorFramework == null) {
curatorFramework = resourceManager.createAndStartCurator(conf); curatorFramework = resourceManager.createAndStartCurator(conf);
} }
@ -246,7 +257,6 @@ public class ZKRMStateStore extends RMStateStore {
@Override @Override
public synchronized void startInternal() throws Exception { public synchronized void startInternal() throws Exception {
// ensure root dirs exist // ensure root dirs exist
createRootDirRecursively(znodeWorkingPath); createRootDirRecursively(znodeWorkingPath);
create(zkRootNodePath); create(zkRootNodePath);
@ -272,9 +282,11 @@ public class ZKRMStateStore extends RMStateStore {
StringBuilder builder = new StringBuilder(); StringBuilder builder = new StringBuilder();
builder.append(prefix); builder.append(prefix);
for (ACL acl : getAcls) { for (ACL acl : getAcls) {
builder.append(acl.toString()); builder.append(acl.toString());
} }
builder.append(getStat.toString()); builder.append(getStat.toString());
LOG.debug(builder.toString()); LOG.debug(builder.toString());
} }
@ -301,6 +313,7 @@ public class ZKRMStateStore extends RMStateStore {
verifyActiveStatusThread.interrupt(); verifyActiveStatusThread.interrupt();
verifyActiveStatusThread.join(1000); verifyActiveStatusThread.join(1000);
} }
if (!HAUtil.isHAEnabled(getConfig())) { if (!HAUtil.isHAEnabled(getConfig())) {
IOUtils.closeStream(curatorFramework); IOUtils.closeStream(curatorFramework);
} }
@ -316,6 +329,7 @@ public class ZKRMStateStore extends RMStateStore {
String versionNodePath = getNodePath(zkRootNodePath, VERSION_NODE); String versionNodePath = getNodePath(zkRootNodePath, VERSION_NODE);
byte[] data = byte[] data =
((VersionPBImpl) CURRENT_VERSION_INFO).getProto().toByteArray(); ((VersionPBImpl) CURRENT_VERSION_INFO).getProto().toByteArray();
if (exists(versionNodePath)) { if (exists(versionNodePath)) {
safeSetData(versionNodePath, data, -1); safeSetData(versionNodePath, data, -1);
} else { } else {
@ -331,6 +345,7 @@ public class ZKRMStateStore extends RMStateStore {
byte[] data = getData(versionNodePath); byte[] data = getData(versionNodePath);
return new VersionPBImpl(VersionProto.parseFrom(data)); return new VersionPBImpl(VersionProto.parseFrom(data));
} }
return null; return null;
} }
@ -338,6 +353,7 @@ public class ZKRMStateStore extends RMStateStore {
public synchronized long getAndIncrementEpoch() throws Exception { public synchronized long getAndIncrementEpoch() throws Exception {
String epochNodePath = getNodePath(zkRootNodePath, EPOCH_NODE); String epochNodePath = getNodePath(zkRootNodePath, EPOCH_NODE);
long currentEpoch = 0; long currentEpoch = 0;
if (exists(epochNodePath)) { if (exists(epochNodePath)) {
// load current epoch // load current epoch
byte[] data = getData(epochNodePath); byte[] data = getData(epochNodePath);
@ -353,6 +369,7 @@ public class ZKRMStateStore extends RMStateStore {
.toByteArray(); .toByteArray();
safeCreate(epochNodePath, storeData, zkAcl, CreateMode.PERSISTENT); safeCreate(epochNodePath, storeData, zkAcl, CreateMode.PERSISTENT);
} }
return currentEpoch; return currentEpoch;
} }
@ -367,31 +384,37 @@ public class ZKRMStateStore extends RMStateStore {
loadAMRMTokenSecretManagerState(rmState); loadAMRMTokenSecretManagerState(rmState);
// recover reservation state // recover reservation state
loadReservationSystemState(rmState); loadReservationSystemState(rmState);
return rmState; return rmState;
} }
private void loadReservationSystemState(RMState rmState) throws Exception { private void loadReservationSystemState(RMState rmState) throws Exception {
List<String> planNodes = getChildren(reservationRoot); List<String> planNodes = getChildren(reservationRoot);
for (String planName : planNodes) { for (String planName : planNodes) {
if (LOG.isDebugEnabled()) { if (LOG.isDebugEnabled()) {
LOG.debug("Loading plan from znode: " + planName); LOG.debug("Loading plan from znode: " + planName);
} }
String planNodePath = getNodePath(reservationRoot, planName);
String planNodePath = getNodePath(reservationRoot, planName);
List<String> reservationNodes = getChildren(planNodePath); List<String> reservationNodes = getChildren(planNodePath);
for (String reservationNodeName : reservationNodes) { for (String reservationNodeName : reservationNodes) {
String reservationNodePath = getNodePath(planNodePath, String reservationNodePath =
reservationNodeName); getNodePath(planNodePath, reservationNodeName);
if (LOG.isDebugEnabled()) { if (LOG.isDebugEnabled()) {
LOG.debug("Loading reservation from znode: " + reservationNodePath); LOG.debug("Loading reservation from znode: " + reservationNodePath);
} }
byte[] reservationData = getData(reservationNodePath); byte[] reservationData = getData(reservationNodePath);
ReservationAllocationStateProto allocationState = ReservationAllocationStateProto allocationState =
ReservationAllocationStateProto.parseFrom(reservationData); ReservationAllocationStateProto.parseFrom(reservationData);
if (!rmState.getReservationState().containsKey(planName)) { if (!rmState.getReservationState().containsKey(planName)) {
rmState.getReservationState().put(planName, rmState.getReservationState().put(planName, new HashMap<>());
new HashMap<ReservationId, ReservationAllocationStateProto>());
} }
ReservationId reservationId = ReservationId reservationId =
ReservationId.parseReservationId(reservationNodeName); ReservationId.parseReservationId(reservationNodeName);
rmState.getReservationState().get(planName).put(reservationId, rmState.getReservationState().get(planName).put(reservationId,
@ -403,16 +426,17 @@ public class ZKRMStateStore extends RMStateStore {
private void loadAMRMTokenSecretManagerState(RMState rmState) private void loadAMRMTokenSecretManagerState(RMState rmState)
throws Exception { throws Exception {
byte[] data = getData(amrmTokenSecretManagerRoot); byte[] data = getData(amrmTokenSecretManagerRoot);
if (data == null) { if (data == null) {
LOG.warn("There is no data saved"); LOG.warn("There is no data saved");
return; } else {
AMRMTokenSecretManagerStatePBImpl stateData =
new AMRMTokenSecretManagerStatePBImpl(
AMRMTokenSecretManagerStateProto.parseFrom(data));
rmState.amrmTokenSecretManagerState =
AMRMTokenSecretManagerState.newInstance(
stateData.getCurrentMasterKey(), stateData.getNextMasterKey());
} }
AMRMTokenSecretManagerStatePBImpl stateData =
new AMRMTokenSecretManagerStatePBImpl(
AMRMTokenSecretManagerStateProto.parseFrom(data));
rmState.amrmTokenSecretManagerState =
AMRMTokenSecretManagerState.newInstance(
stateData.getCurrentMasterKey(), stateData.getNextMasterKey());
} }
private synchronized void loadRMDTSecretManagerState(RMState rmState) private synchronized void loadRMDTSecretManagerState(RMState rmState)
@ -423,8 +447,8 @@ public class ZKRMStateStore extends RMStateStore {
} }
private void loadRMDelegationKeyState(RMState rmState) throws Exception { private void loadRMDelegationKeyState(RMState rmState) throws Exception {
List<String> childNodes = List<String> childNodes = getChildren(dtMasterKeysRootPath);
getChildren(dtMasterKeysRootPath);
for (String childNodeName : childNodes) { for (String childNodeName : childNodes) {
String childNodePath = getNodePath(dtMasterKeysRootPath, childNodeName); String childNodePath = getNodePath(dtMasterKeysRootPath, childNodeName);
byte[] childData = getData(childNodePath); byte[] childData = getData(childNodePath);
@ -435,34 +459,30 @@ public class ZKRMStateStore extends RMStateStore {
} }
ByteArrayInputStream is = new ByteArrayInputStream(childData); ByteArrayInputStream is = new ByteArrayInputStream(childData);
DataInputStream fsIn = new DataInputStream(is);
try { try (DataInputStream fsIn = new DataInputStream(is)) {
if (childNodeName.startsWith(DELEGATION_KEY_PREFIX)) { if (childNodeName.startsWith(DELEGATION_KEY_PREFIX)) {
DelegationKey key = new DelegationKey(); DelegationKey key = new DelegationKey();
key.readFields(fsIn); key.readFields(fsIn);
rmState.rmSecretManagerState.masterKeyState.add(key); rmState.rmSecretManagerState.masterKeyState.add(key);
if (LOG.isDebugEnabled()) { if (LOG.isDebugEnabled()) {
LOG.debug("Loaded delegation key: keyId=" + key.getKeyId() LOG.debug("Loaded delegation key: keyId=" + key.getKeyId()
+ ", expirationDate=" + key.getExpiryDate()); + ", expirationDate=" + key.getExpiryDate());
} }
} }
} finally {
is.close();
} }
} }
} }
private void loadRMSequentialNumberState(RMState rmState) throws Exception { private void loadRMSequentialNumberState(RMState rmState) throws Exception {
byte[] seqData = getData(dtSequenceNumberPath); byte[] seqData = getData(dtSequenceNumberPath);
if (seqData != null) { if (seqData != null) {
ByteArrayInputStream seqIs = new ByteArrayInputStream(seqData); ByteArrayInputStream seqIs = new ByteArrayInputStream(seqData);
DataInputStream seqIn = new DataInputStream(seqIs);
try { try (DataInputStream seqIn = new DataInputStream(seqIs)) {
rmState.rmSecretManagerState.dtSequenceNumber = seqIn.readInt(); rmState.rmSecretManagerState.dtSequenceNumber = seqIn.readInt();
} finally {
seqIn.close();
} }
} }
} }
@ -470,6 +490,7 @@ public class ZKRMStateStore extends RMStateStore {
private void loadRMDelegationTokenState(RMState rmState) throws Exception { private void loadRMDelegationTokenState(RMState rmState) throws Exception {
List<String> childNodes = List<String> childNodes =
getChildren(delegationTokensRootPath); getChildren(delegationTokensRootPath);
for (String childNodeName : childNodes) { for (String childNodeName : childNodes) {
String childNodePath = String childNodePath =
getNodePath(delegationTokensRootPath, childNodeName); getNodePath(delegationTokensRootPath, childNodeName);
@ -481,9 +502,8 @@ public class ZKRMStateStore extends RMStateStore {
} }
ByteArrayInputStream is = new ByteArrayInputStream(childData); ByteArrayInputStream is = new ByteArrayInputStream(childData);
DataInputStream fsIn = new DataInputStream(is);
try { try (DataInputStream fsIn = new DataInputStream(is)) {
if (childNodeName.startsWith(DELEGATION_TOKEN_PREFIX)) { if (childNodeName.startsWith(DELEGATION_TOKEN_PREFIX)) {
RMDelegationTokenIdentifierData identifierData = RMDelegationTokenIdentifierData identifierData =
new RMDelegationTokenIdentifierData(); new RMDelegationTokenIdentifierData();
@ -493,36 +513,40 @@ public class ZKRMStateStore extends RMStateStore {
long renewDate = identifierData.getRenewDate(); long renewDate = identifierData.getRenewDate();
rmState.rmSecretManagerState.delegationTokenState.put(identifier, rmState.rmSecretManagerState.delegationTokenState.put(identifier,
renewDate); renewDate);
if (LOG.isDebugEnabled()) { if (LOG.isDebugEnabled()) {
LOG.debug("Loaded RMDelegationTokenIdentifier: " + identifier LOG.debug("Loaded RMDelegationTokenIdentifier: " + identifier
+ " renewDate=" + renewDate); + " renewDate=" + renewDate);
} }
} }
} finally {
is.close();
} }
} }
} }
private synchronized void loadRMAppState(RMState rmState) throws Exception { private synchronized void loadRMAppState(RMState rmState) throws Exception {
List<String> childNodes = getChildren(rmAppRoot); List<String> childNodes = getChildren(rmAppRoot);
for (String childNodeName : childNodes) { for (String childNodeName : childNodes) {
String childNodePath = getNodePath(rmAppRoot, childNodeName); String childNodePath = getNodePath(rmAppRoot, childNodeName);
byte[] childData = getData(childNodePath); byte[] childData = getData(childNodePath);
if (childNodeName.startsWith(ApplicationId.appIdStrPrefix)) { if (childNodeName.startsWith(ApplicationId.appIdStrPrefix)) {
// application // application
if (LOG.isDebugEnabled()) { if (LOG.isDebugEnabled()) {
LOG.debug("Loading application from znode: " + childNodeName); LOG.debug("Loading application from znode: " + childNodeName);
} }
ApplicationId appId = ApplicationId.fromString(childNodeName); ApplicationId appId = ApplicationId.fromString(childNodeName);
ApplicationStateDataPBImpl appState = ApplicationStateDataPBImpl appState =
new ApplicationStateDataPBImpl( new ApplicationStateDataPBImpl(
ApplicationStateDataProto.parseFrom(childData)); ApplicationStateDataProto.parseFrom(childData));
if (!appId.equals( if (!appId.equals(
appState.getApplicationSubmissionContext().getApplicationId())) { appState.getApplicationSubmissionContext().getApplicationId())) {
throw new YarnRuntimeException("The child node name is different " + throw new YarnRuntimeException("The child node name is different "
"from the application id"); + "from the application id");
} }
rmState.appState.put(appId, appState); rmState.appState.put(appId, appState);
loadApplicationAttemptState(appState, appId); loadApplicationAttemptState(appState, appId);
} else { } else {
@ -536,6 +560,7 @@ public class ZKRMStateStore extends RMStateStore {
throws Exception { throws Exception {
String appPath = getNodePath(rmAppRoot, appId.toString()); String appPath = getNodePath(rmAppRoot, appId.toString());
List<String> attempts = getChildren(appPath); List<String> attempts = getChildren(appPath);
for (String attemptIDStr : attempts) { for (String attemptIDStr : attempts) {
if (attemptIDStr.startsWith(ApplicationAttemptId.appAttemptIdStrPrefix)) { if (attemptIDStr.startsWith(ApplicationAttemptId.appAttemptIdStrPrefix)) {
String attemptPath = getNodePath(appPath, attemptIDStr); String attemptPath = getNodePath(appPath, attemptIDStr);
@ -548,6 +573,7 @@ public class ZKRMStateStore extends RMStateStore {
appState.attempts.put(attemptState.getAttemptId(), attemptState); appState.attempts.put(attemptState.getAttemptId(), attemptState);
} }
} }
LOG.debug("Done loading applications from ZK state store"); LOG.debug("Done loading applications from ZK state store");
} }
@ -559,21 +585,23 @@ public class ZKRMStateStore extends RMStateStore {
if (LOG.isDebugEnabled()) { if (LOG.isDebugEnabled()) {
LOG.debug("Storing info for app: " + appId + " at: " + nodeCreatePath); LOG.debug("Storing info for app: " + appId + " at: " + nodeCreatePath);
} }
byte[] appStateData = appStateDataPB.getProto().toByteArray(); byte[] appStateData = appStateDataPB.getProto().toByteArray();
safeCreate(nodeCreatePath, appStateData, zkAcl, safeCreate(nodeCreatePath, appStateData, zkAcl,
CreateMode.PERSISTENT); CreateMode.PERSISTENT);
} }
@Override @Override
public synchronized void updateApplicationStateInternal(ApplicationId appId, protected synchronized void updateApplicationStateInternal(
ApplicationStateData appStateDataPB) throws Exception { ApplicationId appId, ApplicationStateData appStateDataPB)
throws Exception {
String nodeUpdatePath = getNodePath(rmAppRoot, appId.toString()); String nodeUpdatePath = getNodePath(rmAppRoot, appId.toString());
if (LOG.isDebugEnabled()) { if (LOG.isDebugEnabled()) {
LOG.debug("Storing final state info for app: " + appId + " at: " LOG.debug("Storing final state info for app: " + appId + " at: "
+ nodeUpdatePath); + nodeUpdatePath);
} }
byte[] appStateData = appStateDataPB.getProto().toByteArray(); byte[] appStateData = appStateDataPB.getProto().toByteArray();
if (exists(nodeUpdatePath)) { if (exists(nodeUpdatePath)) {
@ -587,7 +615,7 @@ public class ZKRMStateStore extends RMStateStore {
} }
@Override @Override
public synchronized void storeApplicationAttemptStateInternal( protected synchronized void storeApplicationAttemptStateInternal(
ApplicationAttemptId appAttemptId, ApplicationAttemptId appAttemptId,
ApplicationAttemptStateData attemptStateDataPB) ApplicationAttemptStateData attemptStateDataPB)
throws Exception { throws Exception {
@ -599,13 +627,13 @@ public class ZKRMStateStore extends RMStateStore {
LOG.debug("Storing info for attempt: " + appAttemptId + " at: " LOG.debug("Storing info for attempt: " + appAttemptId + " at: "
+ nodeCreatePath); + nodeCreatePath);
} }
byte[] attemptStateData = attemptStateDataPB.getProto().toByteArray(); byte[] attemptStateData = attemptStateDataPB.getProto().toByteArray();
safeCreate(nodeCreatePath, attemptStateData, zkAcl, safeCreate(nodeCreatePath, attemptStateData, zkAcl, CreateMode.PERSISTENT);
CreateMode.PERSISTENT);
} }
@Override @Override
public synchronized void updateApplicationAttemptStateInternal( protected synchronized void updateApplicationAttemptStateInternal(
ApplicationAttemptId appAttemptId, ApplicationAttemptId appAttemptId,
ApplicationAttemptStateData attemptStateDataPB) ApplicationAttemptStateData attemptStateDataPB)
throws Exception { throws Exception {
@ -613,10 +641,12 @@ public class ZKRMStateStore extends RMStateStore {
String appAttemptIdStr = appAttemptId.toString(); String appAttemptIdStr = appAttemptId.toString();
String appDirPath = getNodePath(rmAppRoot, appIdStr); String appDirPath = getNodePath(rmAppRoot, appIdStr);
String nodeUpdatePath = getNodePath(appDirPath, appAttemptIdStr); String nodeUpdatePath = getNodePath(appDirPath, appAttemptIdStr);
if (LOG.isDebugEnabled()) { if (LOG.isDebugEnabled()) {
LOG.debug("Storing final state info for attempt: " + appAttemptIdStr LOG.debug("Storing final state info for attempt: " + appAttemptIdStr
+ " at: " + nodeUpdatePath); + " at: " + nodeUpdatePath);
} }
byte[] attemptStateData = attemptStateDataPB.getProto().toByteArray(); byte[] attemptStateData = attemptStateDataPB.getProto().toByteArray();
if (exists(nodeUpdatePath)) { if (exists(nodeUpdatePath)) {
@ -630,25 +660,24 @@ public class ZKRMStateStore extends RMStateStore {
} }
@Override @Override
public synchronized void removeApplicationAttemptInternal( protected synchronized void removeApplicationAttemptInternal(
ApplicationAttemptId appAttemptId) ApplicationAttemptId appAttemptId) throws Exception {
throws Exception {
String appId = appAttemptId.getApplicationId().toString(); String appId = appAttemptId.getApplicationId().toString();
String appIdRemovePath = getNodePath(rmAppRoot, appId); String appIdRemovePath = getNodePath(rmAppRoot, appId);
String attemptIdRemovePath = getNodePath(appIdRemovePath, String attemptIdRemovePath =
appAttemptId.toString()); getNodePath(appIdRemovePath, appAttemptId.toString());
if (LOG.isDebugEnabled()) { if (LOG.isDebugEnabled()) {
LOG.debug("Removing info for attempt: " + appAttemptId + " at: " LOG.debug("Removing info for attempt: " + appAttemptId + " at: "
+ attemptIdRemovePath); + attemptIdRemovePath);
} }
safeDelete(attemptIdRemovePath); safeDelete(attemptIdRemovePath);
} }
@Override @Override
public synchronized void removeApplicationStateInternal( protected synchronized void removeApplicationStateInternal(
ApplicationStateData appState) ApplicationStateData appState) throws Exception {
throws Exception {
String appId = appState.getApplicationSubmissionContext().getApplicationId() String appId = appState.getApplicationSubmissionContext().getApplicationId()
.toString(); .toString();
String appIdRemovePath = getNodePath(rmAppRoot, appId); String appIdRemovePath = getNodePath(rmAppRoot, appId);
@ -659,9 +688,11 @@ public class ZKRMStateStore extends RMStateStore {
} }
for (ApplicationAttemptId attemptId : appState.attempts.keySet()) { for (ApplicationAttemptId attemptId : appState.attempts.keySet()) {
String attemptRemovePath = getNodePath(appIdRemovePath, attemptId.toString()); String attemptRemovePath =
getNodePath(appIdRemovePath, attemptId.toString());
safeDelete(attemptRemovePath); safeDelete(attemptRemovePath);
} }
safeDelete(appIdRemovePath); safeDelete(appIdRemovePath);
} }
@ -680,10 +711,12 @@ public class ZKRMStateStore extends RMStateStore {
String nodeRemovePath = String nodeRemovePath =
getNodePath(delegationTokensRootPath, DELEGATION_TOKEN_PREFIX getNodePath(delegationTokensRootPath, DELEGATION_TOKEN_PREFIX
+ rmDTIdentifier.getSequenceNumber()); + rmDTIdentifier.getSequenceNumber());
if (LOG.isDebugEnabled()) { if (LOG.isDebugEnabled()) {
LOG.debug("Removing RMDelegationToken_" LOG.debug("Removing RMDelegationToken_"
+ rmDTIdentifier.getSequenceNumber()); + rmDTIdentifier.getSequenceNumber());
} }
safeDelete(nodeRemovePath); safeDelete(nodeRemovePath);
} }
@ -695,6 +728,7 @@ public class ZKRMStateStore extends RMStateStore {
String nodeRemovePath = String nodeRemovePath =
getNodePath(delegationTokensRootPath, DELEGATION_TOKEN_PREFIX getNodePath(delegationTokensRootPath, DELEGATION_TOKEN_PREFIX
+ rmDTIdentifier.getSequenceNumber()); + rmDTIdentifier.getSequenceNumber());
if (exists(nodeRemovePath)) { if (exists(nodeRemovePath)) {
// in case znode exists // in case znode exists
addStoreOrUpdateOps(trx, rmDTIdentifier, renewDate, true); addStoreOrUpdateOps(trx, rmDTIdentifier, renewDate, true);
@ -703,6 +737,7 @@ public class ZKRMStateStore extends RMStateStore {
addStoreOrUpdateOps(trx, rmDTIdentifier, renewDate, false); addStoreOrUpdateOps(trx, rmDTIdentifier, renewDate, false);
LOG.debug("Attempted to update a non-existing znode " + nodeRemovePath); LOG.debug("Attempted to update a non-existing znode " + nodeRemovePath);
} }
trx.commit(); trx.commit();
} }
@ -710,17 +745,16 @@ public class ZKRMStateStore extends RMStateStore {
RMDelegationTokenIdentifier rmDTIdentifier, Long renewDate, RMDelegationTokenIdentifier rmDTIdentifier, Long renewDate,
boolean isUpdate) throws Exception { boolean isUpdate) throws Exception {
// store RM delegation token // store RM delegation token
String nodeCreatePath = String nodeCreatePath = getNodePath(delegationTokensRootPath,
getNodePath(delegationTokensRootPath, DELEGATION_TOKEN_PREFIX DELEGATION_TOKEN_PREFIX + rmDTIdentifier.getSequenceNumber());
+ rmDTIdentifier.getSequenceNumber());
ByteArrayOutputStream seqOs = new ByteArrayOutputStream();
DataOutputStream seqOut = new DataOutputStream(seqOs);
RMDelegationTokenIdentifierData identifierData = RMDelegationTokenIdentifierData identifierData =
new RMDelegationTokenIdentifierData(rmDTIdentifier, renewDate); new RMDelegationTokenIdentifierData(rmDTIdentifier, renewDate);
try { ByteArrayOutputStream seqOs = new ByteArrayOutputStream();
try (DataOutputStream seqOut = new DataOutputStream(seqOs)) {
if (LOG.isDebugEnabled()) { if (LOG.isDebugEnabled()) {
LOG.debug((isUpdate ? "Storing " : "Updating ") + "RMDelegationToken_" + LOG.debug((isUpdate ? "Storing " : "Updating ") + "RMDelegationToken_"
rmDTIdentifier.getSequenceNumber()); + rmDTIdentifier.getSequenceNumber());
} }
if (isUpdate) { if (isUpdate) {
@ -730,24 +764,23 @@ public class ZKRMStateStore extends RMStateStore {
CreateMode.PERSISTENT); CreateMode.PERSISTENT);
// Update Sequence number only while storing DT // Update Sequence number only while storing DT
seqOut.writeInt(rmDTIdentifier.getSequenceNumber()); seqOut.writeInt(rmDTIdentifier.getSequenceNumber());
if (LOG.isDebugEnabled()) { if (LOG.isDebugEnabled()) {
LOG.debug((isUpdate ? "Storing " : "Updating ") + LOG.debug((isUpdate ? "Storing " : "Updating ")
dtSequenceNumberPath + ". SequenceNumber: " + dtSequenceNumberPath + ". SequenceNumber: "
+ rmDTIdentifier.getSequenceNumber()); + rmDTIdentifier.getSequenceNumber());
} }
trx.setData(dtSequenceNumberPath, seqOs.toByteArray(), -1); trx.setData(dtSequenceNumberPath, seqOs.toByteArray(), -1);
} }
} finally {
seqOs.close();
} }
} }
@Override @Override
protected synchronized void storeRMDTMasterKeyState( protected synchronized void storeRMDTMasterKeyState(
DelegationKey delegationKey) throws Exception { DelegationKey delegationKey) throws Exception {
String nodeCreatePath = String nodeCreatePath = getNodePath(dtMasterKeysRootPath,
getNodePath(dtMasterKeysRootPath, DELEGATION_KEY_PREFIX DELEGATION_KEY_PREFIX + delegationKey.getKeyId());
+ delegationKey.getKeyId());
if (LOG.isDebugEnabled()) { if (LOG.isDebugEnabled()) {
LOG.debug("Storing RMDelegationKey_" + delegationKey.getKeyId()); LOG.debug("Storing RMDelegationKey_" + delegationKey.getKeyId());
} }
@ -765,9 +798,11 @@ public class ZKRMStateStore extends RMStateStore {
String nodeRemovePath = String nodeRemovePath =
getNodePath(dtMasterKeysRootPath, DELEGATION_KEY_PREFIX getNodePath(dtMasterKeysRootPath, DELEGATION_KEY_PREFIX
+ delegationKey.getKeyId()); + delegationKey.getKeyId());
if (LOG.isDebugEnabled()) { if (LOG.isDebugEnabled()) {
LOG.debug("Removing RMDelegationKey_" + delegationKey.getKeyId()); LOG.debug("Removing RMDelegationKey_" + delegationKey.getKeyId());
} }
safeDelete(nodeRemovePath); safeDelete(nodeRemovePath);
} }
@ -789,30 +824,31 @@ public class ZKRMStateStore extends RMStateStore {
} }
@Override @Override
public synchronized void storeOrUpdateAMRMTokenSecretManagerState( protected synchronized void storeOrUpdateAMRMTokenSecretManagerState(
AMRMTokenSecretManagerState amrmTokenSecretManagerState, boolean isUpdate) AMRMTokenSecretManagerState amrmTokenSecretManagerState, boolean isUpdate)
throws Exception { throws Exception {
AMRMTokenSecretManagerState data = AMRMTokenSecretManagerState data =
AMRMTokenSecretManagerState.newInstance(amrmTokenSecretManagerState); AMRMTokenSecretManagerState.newInstance(amrmTokenSecretManagerState);
byte[] stateData = data.getProto().toByteArray(); byte[] stateData = data.getProto().toByteArray();
safeSetData(amrmTokenSecretManagerRoot, stateData, -1); safeSetData(amrmTokenSecretManagerRoot, stateData, -1);
} }
@Override @Override
protected synchronized void removeReservationState(String planName, protected synchronized void removeReservationState(String planName,
String reservationIdName) String reservationIdName) throws Exception {
throws Exception { String planNodePath = getNodePath(reservationRoot, planName);
String planNodePath = String reservationPath = getNodePath(planNodePath, reservationIdName);
getNodePath(reservationRoot, planName);
String reservationPath = getNodePath(planNodePath,
reservationIdName);
if (LOG.isDebugEnabled()) { if (LOG.isDebugEnabled()) {
LOG.debug("Removing reservationallocation " + reservationIdName + " for" + LOG.debug("Removing reservationallocation " + reservationIdName
" plan " + planName); + " for" + " plan " + planName);
} }
safeDelete(reservationPath); safeDelete(reservationPath);
List<String> reservationNodes = getChildren(planNodePath); List<String> reservationNodes = getChildren(planNodePath);
if (reservationNodes.isEmpty()) { if (reservationNodes.isEmpty()) {
safeDelete(planNodePath); safeDelete(planNodePath);
} }
@ -821,11 +857,10 @@ public class ZKRMStateStore extends RMStateStore {
@Override @Override
protected synchronized void storeReservationState( protected synchronized void storeReservationState(
ReservationAllocationStateProto reservationAllocation, String planName, ReservationAllocationStateProto reservationAllocation, String planName,
String reservationIdName) String reservationIdName) throws Exception {
throws Exception {
SafeTransaction trx = new SafeTransaction(); SafeTransaction trx = new SafeTransaction();
addOrUpdateReservationState( addOrUpdateReservationState(reservationAllocation, planName,
reservationAllocation, planName, reservationIdName, trx, false); reservationIdName, trx, false);
trx.commit(); trx.commit();
} }
@ -843,6 +878,7 @@ public class ZKRMStateStore extends RMStateStore {
if (LOG.isDebugEnabled()) { if (LOG.isDebugEnabled()) {
LOG.debug("Creating plan node: " + planName + " at: " + planCreatePath); LOG.debug("Creating plan node: " + planName + " at: " + planCreatePath);
} }
trx.create(planCreatePath, null, zkAcl, CreateMode.PERSISTENT); trx.create(planCreatePath, null, zkAcl, CreateMode.PERSISTENT);
} }
@ -871,6 +907,7 @@ public class ZKRMStateStore extends RMStateStore {
Preconditions.checkArgument(pathParts.length >= 1 && pathParts[0].isEmpty(), Preconditions.checkArgument(pathParts.length >= 1 && pathParts[0].isEmpty(),
"Invalid path: %s", path); "Invalid path: %s", path);
StringBuilder sb = new StringBuilder(); StringBuilder sb = new StringBuilder();
for (int i = 1; i < pathParts.length; i++) { for (int i = 1; i < pathParts.length; i++) {
sb.append("/").append(pathParts[i]); sb.append("/").append(pathParts[i]);
create(sb.toString()); create(sb.toString());
@ -947,10 +984,9 @@ public class ZKRMStateStore extends RMStateStore {
SafeTransaction() throws Exception { SafeTransaction() throws Exception {
CuratorTransaction transaction = curatorFramework.inTransaction(); CuratorTransaction transaction = curatorFramework.inTransaction();
transactionFinal = transactionFinal = transaction.create()
transaction.create() .withMode(CreateMode.PERSISTENT).withACL(zkAcl)
.withMode(CreateMode.PERSISTENT).withACL(zkAcl) .forPath(fencingNodePath, new byte[0]).and();
.forPath(fencingNodePath, new byte[0]).and();
} }
public void commit() throws Exception { public void commit() throws Exception {
@ -985,19 +1021,17 @@ public class ZKRMStateStore extends RMStateStore {
super(VerifyActiveStatusThread.class.getName()); super(VerifyActiveStatusThread.class.getName());
} }
@Override
public void run() { public void run() {
try { try {
while (true) { while (!isFencedState()) {
if(isFencedState()) {
break;
}
// Create and delete fencing node // Create and delete fencing node
new SafeTransaction().commit(); new SafeTransaction().commit();
Thread.sleep(zkSessionTimeout); Thread.sleep(zkSessionTimeout);
} }
} catch (InterruptedException ie) { } catch (InterruptedException ie) {
LOG.info(VerifyActiveStatusThread.class.getName() + " thread " + LOG.info(getName() + " thread interrupted! Exiting!");
"interrupted! Exiting!"); interrupt();
} catch (Exception e) { } catch (Exception e) {
notifyStoreOperationFailed(new StoreFencedException()); notifyStoreOperationFailed(new StoreFencedException());
} }