From bcb2528a51c33e4caff8d744c5e14c1accfc47d0 Mon Sep 17 00:00:00 2001 From: Robert Kanter Date: Wed, 28 Sep 2016 14:56:41 -0700 Subject: [PATCH] YARN-5400. Light cleanup in ZKRMStateStore (templedf via rkanter) --- .../server/resourcemanager/RMZKUtils.java | 19 +- .../resourcemanager/ResourceManager.java | 2 +- .../recovery/ZKRMStateStore.java | 260 ++++++++++-------- 3 files changed, 161 insertions(+), 120 deletions(-) diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMZKUtils.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMZKUtils.java index d78068f60f7..4b8561dae15 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMZKUtils.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMZKUtils.java @@ -18,6 +18,7 @@ package org.apache.hadoop.yarn.server.resourcemanager; +import java.io.IOException; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.classification.InterfaceAudience; @@ -37,9 +38,12 @@ public class RMZKUtils { private static final Log LOG = LogFactory.getLog(RMZKUtils.class); /** - * Utility method to fetch the ZK ACLs from the configuration + * Utility method to fetch the ZK ACLs from the configuration. + * + * @throws java.io.IOException if the Zookeeper ACLs configuration file + * cannot be read */ - public static List getZKAcls(Configuration conf) throws Exception { + public static List getZKAcls(Configuration conf) throws IOException { // Parse authentication from configuration. String zkAclConf = conf.get(YarnConfiguration.RM_ZK_ACL, @@ -47,17 +51,20 @@ public class RMZKUtils { try { zkAclConf = ZKUtil.resolveConfIndirection(zkAclConf); return ZKUtil.parseACLs(zkAclConf); - } catch (Exception e) { + } catch (IOException | ZKUtil.BadAclFormatException e) { LOG.error("Couldn't read ACLs based on " + YarnConfiguration.RM_ZK_ACL); throw e; } } /** - * Utility method to fetch ZK auth info from the configuration + * Utility method to fetch ZK auth info from the configuration. + * + * @throws java.io.IOException if the Zookeeper ACLs configuration file + * cannot be read */ public static List getZKAuths(Configuration conf) - throws Exception { + throws IOException { String zkAuthConf = conf.get(YarnConfiguration.RM_ZK_AUTH); try { zkAuthConf = ZKUtil.resolveConfIndirection(zkAuthConf); @@ -66,7 +73,7 @@ public class RMZKUtils { } else { return Collections.emptyList(); } - } catch (Exception e) { + } catch (IOException | ZKUtil.BadAuthFormatException e) { LOG.error("Couldn't read Auth based on " + YarnConfiguration.RM_ZK_AUTH); throw e; } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java index bf72fc1465e..8a6997d8c67 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java @@ -320,7 +320,7 @@ public class ResourceManager extends CompositeService implements Recoverable { } public CuratorFramework createAndStartCurator(Configuration conf) - throws Exception { + throws IOException { String zkHostPort = conf.get(YarnConfiguration.RM_ZK_ADDRESS); if (zkHostPort == null) { throw new YarnRuntimeException( diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/ZKRMStateStore.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/ZKRMStateStore.java index c24b3e9f5e7..51bb74df636 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/ZKRMStateStore.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/ZKRMStateStore.java @@ -56,7 +56,6 @@ import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.impl.pb.AM import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.impl.pb.ApplicationAttemptStateDataPBImpl; import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.impl.pb.ApplicationStateDataPBImpl; import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.impl.pb.EpochPBImpl; -import org.apache.hadoop.yarn.util.ConverterUtils; import org.apache.zookeeper.CreateMode; import org.apache.zookeeper.ZooDefs; import org.apache.zookeeper.data.ACL; @@ -68,8 +67,8 @@ import java.io.ByteArrayInputStream; import java.io.ByteArrayOutputStream; import java.io.DataInputStream; import java.io.DataOutputStream; +import java.io.IOException; import java.security.NoSuchAlgorithmException; -import java.security.SecureRandom; import java.util.ArrayList; import java.util.HashMap; import java.util.List; @@ -121,21 +120,18 @@ import java.util.List; @Private @Unstable public class ZKRMStateStore extends RMStateStore { - - public static final Log LOG = LogFactory.getLog(ZKRMStateStore.class); - private final SecureRandom random = new SecureRandom(); - - protected static final String ROOT_ZNODE_NAME = "ZKRMStateRoot"; - protected static final Version CURRENT_VERSION_INFO = Version - .newInstance(1, 3); + private static final Log LOG = LogFactory.getLog(ZKRMStateStore.class); private static final String RM_DELEGATION_TOKENS_ROOT_ZNODE_NAME = "RMDelegationTokensRoot"; private static final String RM_DT_SEQUENTIAL_NUMBER_ZNODE_NAME = "RMDTSequentialNumber"; private static final String RM_DT_MASTER_KEYS_ROOT_ZNODE_NAME = "RMDTMasterKeysRoot"; + protected static final String ROOT_ZNODE_NAME = "ZKRMStateRoot"; + protected static final Version CURRENT_VERSION_INFO = + Version.newInstance(1, 3); - /** Znode paths */ + /* Znode paths */ private String zkRootNodePath; private String rmAppRoot; private String rmDTSecretManagerRoot; @@ -144,44 +140,54 @@ public class ZKRMStateStore extends RMStateStore { private String dtSequenceNumberPath; private String amrmTokenSecretManagerRoot; private String reservationRoot; + @VisibleForTesting protected String znodeWorkingPath; - /** Fencing related variables */ + /* Fencing related variables */ private static final String FENCING_LOCK = "RM_ZK_FENCING_LOCK"; private String fencingNodePath; private Thread verifyActiveStatusThread; private int zkSessionTimeout; - /** ACL and auth info */ + /* ACL and auth info */ private List zkAcl; @VisibleForTesting List zkRootNodeAcl; private String zkRootNodeUsername; - public static final int CREATE_DELETE_PERMS = + + private static final int CREATE_DELETE_PERMS = ZooDefs.Perms.CREATE | ZooDefs.Perms.DELETE; private final String zkRootNodeAuthScheme = new DigestAuthenticationProvider().getScheme(); @VisibleForTesting protected CuratorFramework curatorFramework; + /** - * Given the {@link Configuration} and {@link ACL}s used (zkAcl) for + * Given the {@link Configuration} and {@link ACL}s used (sourceACLs) for * ZooKeeper access, construct the {@link ACL}s for the store's root node. - * In the constructed {@link ACL}, all the users allowed by zkAcl are given - * rwa access, while the current RM has exclude create-delete access. + * In the constructed {@link ACL}, all the users allowed by sourceACLs are + * given read-write-admin access, while the current RM has exclusive + * create-delete access. * - * To be called only when HA is enabled and the configuration doesn't set ACL - * for the root node. + * To be called only when HA is enabled and the configuration doesn't set an + * ACL for the root node. + * @param conf the configuration + * @param sourceACLs the source ACLs + * @return ACLs for the store's root node + * @throws java.security.NoSuchAlgorithmException thrown if the digest + * algorithm used by Zookeeper cannot be found */ @VisibleForTesting @Private @Unstable - protected List constructZkRootNodeACL( - Configuration conf, List sourceACLs) throws NoSuchAlgorithmException { - List zkRootNodeAcl = new ArrayList<>(); + protected List constructZkRootNodeACL(Configuration conf, + List sourceACLs) throws NoSuchAlgorithmException { + List zkRootNodeAclList = new ArrayList<>(); + for (ACL acl : sourceACLs) { - zkRootNodeAcl.add(new ACL( + zkRootNodeAclList.add(new ACL( ZKUtil.removeSpecificPerms(acl.getPerms(), CREATE_DELETE_PERMS), acl.getId())); } @@ -190,15 +196,16 @@ public class ZKRMStateStore extends RMStateStore { YarnConfiguration.RM_ADDRESS, YarnConfiguration.DEFAULT_RM_ADDRESS, conf); Id rmId = new Id(zkRootNodeAuthScheme, - DigestAuthenticationProvider.generateDigest( - zkRootNodeUsername + ":" + resourceManager.getZkRootNodePassword())); - zkRootNodeAcl.add(new ACL(CREATE_DELETE_PERMS, rmId)); - return zkRootNodeAcl; + DigestAuthenticationProvider.generateDigest(zkRootNodeUsername + ":" + + resourceManager.getZkRootNodePassword())); + zkRootNodeAclList.add(new ACL(CREATE_DELETE_PERMS, rmId)); + + return zkRootNodeAclList; } @Override - public synchronized void initInternal(Configuration conf) throws Exception { - + public synchronized void initInternal(Configuration conf) + throws IOException, NoSuchAlgorithmException { /* Initialize fencing related paths, acls, and ops */ znodeWorkingPath = conf.get(YarnConfiguration.ZK_RM_STATE_STORE_PARENT_PATH, @@ -210,16 +217,19 @@ public class ZKRMStateStore extends RMStateStore { YarnConfiguration.DEFAULT_RM_ZK_TIMEOUT_MS); zkAcl = RMZKUtils.getZKAcls(conf); + if (HAUtil.isHAEnabled(conf)) { String zkRootNodeAclConf = HAUtil.getConfValueForRMInstance (YarnConfiguration.ZK_RM_STATE_STORE_ROOT_NODE_ACL, conf); + if (zkRootNodeAclConf != null) { zkRootNodeAclConf = ZKUtil.resolveConfIndirection(zkRootNodeAclConf); + try { zkRootNodeAcl = ZKUtil.parseACLs(zkRootNodeAclConf); } catch (ZKUtil.BadAclFormatException bafe) { - LOG.error("Invalid format for " + - YarnConfiguration.ZK_RM_STATE_STORE_ROOT_NODE_ACL); + LOG.error("Invalid format for " + + YarnConfiguration.ZK_RM_STATE_STORE_ROOT_NODE_ACL); throw bafe; } } else { @@ -239,6 +249,7 @@ public class ZKRMStateStore extends RMStateStore { getNodePath(zkRootNodePath, AMRMTOKEN_SECRET_MANAGER_ROOT); reservationRoot = getNodePath(zkRootNodePath, RESERVATION_SYSTEM_ROOT); curatorFramework = resourceManager.getCurator(); + if (curatorFramework == null) { curatorFramework = resourceManager.createAndStartCurator(conf); } @@ -246,7 +257,6 @@ public class ZKRMStateStore extends RMStateStore { @Override public synchronized void startInternal() throws Exception { - // ensure root dirs exist createRootDirRecursively(znodeWorkingPath); create(zkRootNodePath); @@ -272,9 +282,11 @@ public class ZKRMStateStore extends RMStateStore { StringBuilder builder = new StringBuilder(); builder.append(prefix); + for (ACL acl : getAcls) { builder.append(acl.toString()); } + builder.append(getStat.toString()); LOG.debug(builder.toString()); } @@ -301,6 +313,7 @@ public class ZKRMStateStore extends RMStateStore { verifyActiveStatusThread.interrupt(); verifyActiveStatusThread.join(1000); } + if (!HAUtil.isHAEnabled(getConfig())) { IOUtils.closeStream(curatorFramework); } @@ -316,6 +329,7 @@ public class ZKRMStateStore extends RMStateStore { String versionNodePath = getNodePath(zkRootNodePath, VERSION_NODE); byte[] data = ((VersionPBImpl) CURRENT_VERSION_INFO).getProto().toByteArray(); + if (exists(versionNodePath)) { safeSetData(versionNodePath, data, -1); } else { @@ -331,6 +345,7 @@ public class ZKRMStateStore extends RMStateStore { byte[] data = getData(versionNodePath); return new VersionPBImpl(VersionProto.parseFrom(data)); } + return null; } @@ -338,6 +353,7 @@ public class ZKRMStateStore extends RMStateStore { public synchronized long getAndIncrementEpoch() throws Exception { String epochNodePath = getNodePath(zkRootNodePath, EPOCH_NODE); long currentEpoch = 0; + if (exists(epochNodePath)) { // load current epoch byte[] data = getData(epochNodePath); @@ -353,6 +369,7 @@ public class ZKRMStateStore extends RMStateStore { .toByteArray(); safeCreate(epochNodePath, storeData, zkAcl, CreateMode.PERSISTENT); } + return currentEpoch; } @@ -367,31 +384,37 @@ public class ZKRMStateStore extends RMStateStore { loadAMRMTokenSecretManagerState(rmState); // recover reservation state loadReservationSystemState(rmState); + return rmState; } private void loadReservationSystemState(RMState rmState) throws Exception { List planNodes = getChildren(reservationRoot); + for (String planName : planNodes) { if (LOG.isDebugEnabled()) { LOG.debug("Loading plan from znode: " + planName); } - String planNodePath = getNodePath(reservationRoot, planName); + String planNodePath = getNodePath(reservationRoot, planName); List reservationNodes = getChildren(planNodePath); + for (String reservationNodeName : reservationNodes) { - String reservationNodePath = getNodePath(planNodePath, - reservationNodeName); + String reservationNodePath = + getNodePath(planNodePath, reservationNodeName); + if (LOG.isDebugEnabled()) { LOG.debug("Loading reservation from znode: " + reservationNodePath); } + byte[] reservationData = getData(reservationNodePath); ReservationAllocationStateProto allocationState = ReservationAllocationStateProto.parseFrom(reservationData); + if (!rmState.getReservationState().containsKey(planName)) { - rmState.getReservationState().put(planName, - new HashMap()); + rmState.getReservationState().put(planName, new HashMap<>()); } + ReservationId reservationId = ReservationId.parseReservationId(reservationNodeName); rmState.getReservationState().get(planName).put(reservationId, @@ -403,16 +426,17 @@ public class ZKRMStateStore extends RMStateStore { private void loadAMRMTokenSecretManagerState(RMState rmState) throws Exception { byte[] data = getData(amrmTokenSecretManagerRoot); + if (data == null) { LOG.warn("There is no data saved"); - return; + } else { + AMRMTokenSecretManagerStatePBImpl stateData = + new AMRMTokenSecretManagerStatePBImpl( + AMRMTokenSecretManagerStateProto.parseFrom(data)); + rmState.amrmTokenSecretManagerState = + AMRMTokenSecretManagerState.newInstance( + stateData.getCurrentMasterKey(), stateData.getNextMasterKey()); } - AMRMTokenSecretManagerStatePBImpl stateData = - new AMRMTokenSecretManagerStatePBImpl( - AMRMTokenSecretManagerStateProto.parseFrom(data)); - rmState.amrmTokenSecretManagerState = - AMRMTokenSecretManagerState.newInstance( - stateData.getCurrentMasterKey(), stateData.getNextMasterKey()); } private synchronized void loadRMDTSecretManagerState(RMState rmState) @@ -423,8 +447,8 @@ public class ZKRMStateStore extends RMStateStore { } private void loadRMDelegationKeyState(RMState rmState) throws Exception { - List childNodes = - getChildren(dtMasterKeysRootPath); + List childNodes = getChildren(dtMasterKeysRootPath); + for (String childNodeName : childNodes) { String childNodePath = getNodePath(dtMasterKeysRootPath, childNodeName); byte[] childData = getData(childNodePath); @@ -435,34 +459,30 @@ public class ZKRMStateStore extends RMStateStore { } ByteArrayInputStream is = new ByteArrayInputStream(childData); - DataInputStream fsIn = new DataInputStream(is); - try { + try (DataInputStream fsIn = new DataInputStream(is)) { if (childNodeName.startsWith(DELEGATION_KEY_PREFIX)) { DelegationKey key = new DelegationKey(); key.readFields(fsIn); rmState.rmSecretManagerState.masterKeyState.add(key); + if (LOG.isDebugEnabled()) { LOG.debug("Loaded delegation key: keyId=" + key.getKeyId() + ", expirationDate=" + key.getExpiryDate()); } } - } finally { - is.close(); } } } private void loadRMSequentialNumberState(RMState rmState) throws Exception { byte[] seqData = getData(dtSequenceNumberPath); + if (seqData != null) { ByteArrayInputStream seqIs = new ByteArrayInputStream(seqData); - DataInputStream seqIn = new DataInputStream(seqIs); - try { + try (DataInputStream seqIn = new DataInputStream(seqIs)) { rmState.rmSecretManagerState.dtSequenceNumber = seqIn.readInt(); - } finally { - seqIn.close(); } } } @@ -470,6 +490,7 @@ public class ZKRMStateStore extends RMStateStore { private void loadRMDelegationTokenState(RMState rmState) throws Exception { List childNodes = getChildren(delegationTokensRootPath); + for (String childNodeName : childNodes) { String childNodePath = getNodePath(delegationTokensRootPath, childNodeName); @@ -481,9 +502,8 @@ public class ZKRMStateStore extends RMStateStore { } ByteArrayInputStream is = new ByteArrayInputStream(childData); - DataInputStream fsIn = new DataInputStream(is); - try { + try (DataInputStream fsIn = new DataInputStream(is)) { if (childNodeName.startsWith(DELEGATION_TOKEN_PREFIX)) { RMDelegationTokenIdentifierData identifierData = new RMDelegationTokenIdentifierData(); @@ -493,36 +513,40 @@ public class ZKRMStateStore extends RMStateStore { long renewDate = identifierData.getRenewDate(); rmState.rmSecretManagerState.delegationTokenState.put(identifier, renewDate); + if (LOG.isDebugEnabled()) { LOG.debug("Loaded RMDelegationTokenIdentifier: " + identifier + " renewDate=" + renewDate); } } - } finally { - is.close(); } } } private synchronized void loadRMAppState(RMState rmState) throws Exception { List childNodes = getChildren(rmAppRoot); + for (String childNodeName : childNodes) { String childNodePath = getNodePath(rmAppRoot, childNodeName); byte[] childData = getData(childNodePath); + if (childNodeName.startsWith(ApplicationId.appIdStrPrefix)) { // application if (LOG.isDebugEnabled()) { LOG.debug("Loading application from znode: " + childNodeName); } + ApplicationId appId = ApplicationId.fromString(childNodeName); ApplicationStateDataPBImpl appState = new ApplicationStateDataPBImpl( ApplicationStateDataProto.parseFrom(childData)); + if (!appId.equals( appState.getApplicationSubmissionContext().getApplicationId())) { - throw new YarnRuntimeException("The child node name is different " + - "from the application id"); + throw new YarnRuntimeException("The child node name is different " + + "from the application id"); } + rmState.appState.put(appId, appState); loadApplicationAttemptState(appState, appId); } else { @@ -536,6 +560,7 @@ public class ZKRMStateStore extends RMStateStore { throws Exception { String appPath = getNodePath(rmAppRoot, appId.toString()); List attempts = getChildren(appPath); + for (String attemptIDStr : attempts) { if (attemptIDStr.startsWith(ApplicationAttemptId.appAttemptIdStrPrefix)) { String attemptPath = getNodePath(appPath, attemptIDStr); @@ -548,6 +573,7 @@ public class ZKRMStateStore extends RMStateStore { appState.attempts.put(attemptState.getAttemptId(), attemptState); } } + LOG.debug("Done loading applications from ZK state store"); } @@ -559,21 +585,23 @@ public class ZKRMStateStore extends RMStateStore { if (LOG.isDebugEnabled()) { LOG.debug("Storing info for app: " + appId + " at: " + nodeCreatePath); } + byte[] appStateData = appStateDataPB.getProto().toByteArray(); safeCreate(nodeCreatePath, appStateData, zkAcl, CreateMode.PERSISTENT); - } @Override - public synchronized void updateApplicationStateInternal(ApplicationId appId, - ApplicationStateData appStateDataPB) throws Exception { + protected synchronized void updateApplicationStateInternal( + ApplicationId appId, ApplicationStateData appStateDataPB) + throws Exception { String nodeUpdatePath = getNodePath(rmAppRoot, appId.toString()); if (LOG.isDebugEnabled()) { LOG.debug("Storing final state info for app: " + appId + " at: " + nodeUpdatePath); } + byte[] appStateData = appStateDataPB.getProto().toByteArray(); if (exists(nodeUpdatePath)) { @@ -587,7 +615,7 @@ public class ZKRMStateStore extends RMStateStore { } @Override - public synchronized void storeApplicationAttemptStateInternal( + protected synchronized void storeApplicationAttemptStateInternal( ApplicationAttemptId appAttemptId, ApplicationAttemptStateData attemptStateDataPB) throws Exception { @@ -599,13 +627,13 @@ public class ZKRMStateStore extends RMStateStore { LOG.debug("Storing info for attempt: " + appAttemptId + " at: " + nodeCreatePath); } + byte[] attemptStateData = attemptStateDataPB.getProto().toByteArray(); - safeCreate(nodeCreatePath, attemptStateData, zkAcl, - CreateMode.PERSISTENT); + safeCreate(nodeCreatePath, attemptStateData, zkAcl, CreateMode.PERSISTENT); } @Override - public synchronized void updateApplicationAttemptStateInternal( + protected synchronized void updateApplicationAttemptStateInternal( ApplicationAttemptId appAttemptId, ApplicationAttemptStateData attemptStateDataPB) throws Exception { @@ -613,10 +641,12 @@ public class ZKRMStateStore extends RMStateStore { String appAttemptIdStr = appAttemptId.toString(); String appDirPath = getNodePath(rmAppRoot, appIdStr); String nodeUpdatePath = getNodePath(appDirPath, appAttemptIdStr); + if (LOG.isDebugEnabled()) { LOG.debug("Storing final state info for attempt: " + appAttemptIdStr + " at: " + nodeUpdatePath); } + byte[] attemptStateData = attemptStateDataPB.getProto().toByteArray(); if (exists(nodeUpdatePath)) { @@ -630,25 +660,24 @@ public class ZKRMStateStore extends RMStateStore { } @Override - public synchronized void removeApplicationAttemptInternal( - ApplicationAttemptId appAttemptId) - throws Exception { + protected synchronized void removeApplicationAttemptInternal( + ApplicationAttemptId appAttemptId) throws Exception { String appId = appAttemptId.getApplicationId().toString(); String appIdRemovePath = getNodePath(rmAppRoot, appId); - String attemptIdRemovePath = getNodePath(appIdRemovePath, - appAttemptId.toString()); + String attemptIdRemovePath = + getNodePath(appIdRemovePath, appAttemptId.toString()); if (LOG.isDebugEnabled()) { LOG.debug("Removing info for attempt: " + appAttemptId + " at: " + attemptIdRemovePath); } + safeDelete(attemptIdRemovePath); } @Override - public synchronized void removeApplicationStateInternal( - ApplicationStateData appState) - throws Exception { + protected synchronized void removeApplicationStateInternal( + ApplicationStateData appState) throws Exception { String appId = appState.getApplicationSubmissionContext().getApplicationId() .toString(); String appIdRemovePath = getNodePath(rmAppRoot, appId); @@ -659,9 +688,11 @@ public class ZKRMStateStore extends RMStateStore { } for (ApplicationAttemptId attemptId : appState.attempts.keySet()) { - String attemptRemovePath = getNodePath(appIdRemovePath, attemptId.toString()); + String attemptRemovePath = + getNodePath(appIdRemovePath, attemptId.toString()); safeDelete(attemptRemovePath); } + safeDelete(appIdRemovePath); } @@ -680,10 +711,12 @@ public class ZKRMStateStore extends RMStateStore { String nodeRemovePath = getNodePath(delegationTokensRootPath, DELEGATION_TOKEN_PREFIX + rmDTIdentifier.getSequenceNumber()); + if (LOG.isDebugEnabled()) { LOG.debug("Removing RMDelegationToken_" + rmDTIdentifier.getSequenceNumber()); } + safeDelete(nodeRemovePath); } @@ -695,6 +728,7 @@ public class ZKRMStateStore extends RMStateStore { String nodeRemovePath = getNodePath(delegationTokensRootPath, DELEGATION_TOKEN_PREFIX + rmDTIdentifier.getSequenceNumber()); + if (exists(nodeRemovePath)) { // in case znode exists addStoreOrUpdateOps(trx, rmDTIdentifier, renewDate, true); @@ -703,6 +737,7 @@ public class ZKRMStateStore extends RMStateStore { addStoreOrUpdateOps(trx, rmDTIdentifier, renewDate, false); LOG.debug("Attempted to update a non-existing znode " + nodeRemovePath); } + trx.commit(); } @@ -710,17 +745,16 @@ public class ZKRMStateStore extends RMStateStore { RMDelegationTokenIdentifier rmDTIdentifier, Long renewDate, boolean isUpdate) throws Exception { // store RM delegation token - String nodeCreatePath = - getNodePath(delegationTokensRootPath, DELEGATION_TOKEN_PREFIX - + rmDTIdentifier.getSequenceNumber()); - ByteArrayOutputStream seqOs = new ByteArrayOutputStream(); - DataOutputStream seqOut = new DataOutputStream(seqOs); + String nodeCreatePath = getNodePath(delegationTokensRootPath, + DELEGATION_TOKEN_PREFIX + rmDTIdentifier.getSequenceNumber()); RMDelegationTokenIdentifierData identifierData = new RMDelegationTokenIdentifierData(rmDTIdentifier, renewDate); - try { + ByteArrayOutputStream seqOs = new ByteArrayOutputStream(); + + try (DataOutputStream seqOut = new DataOutputStream(seqOs)) { if (LOG.isDebugEnabled()) { - LOG.debug((isUpdate ? "Storing " : "Updating ") + "RMDelegationToken_" + - rmDTIdentifier.getSequenceNumber()); + LOG.debug((isUpdate ? "Storing " : "Updating ") + "RMDelegationToken_" + + rmDTIdentifier.getSequenceNumber()); } if (isUpdate) { @@ -730,24 +764,23 @@ public class ZKRMStateStore extends RMStateStore { CreateMode.PERSISTENT); // Update Sequence number only while storing DT seqOut.writeInt(rmDTIdentifier.getSequenceNumber()); + if (LOG.isDebugEnabled()) { - LOG.debug((isUpdate ? "Storing " : "Updating ") + - dtSequenceNumberPath + ". SequenceNumber: " + LOG.debug((isUpdate ? "Storing " : "Updating ") + + dtSequenceNumberPath + ". SequenceNumber: " + rmDTIdentifier.getSequenceNumber()); } + trx.setData(dtSequenceNumberPath, seqOs.toByteArray(), -1); } - } finally { - seqOs.close(); } } @Override protected synchronized void storeRMDTMasterKeyState( DelegationKey delegationKey) throws Exception { - String nodeCreatePath = - getNodePath(dtMasterKeysRootPath, DELEGATION_KEY_PREFIX - + delegationKey.getKeyId()); + String nodeCreatePath = getNodePath(dtMasterKeysRootPath, + DELEGATION_KEY_PREFIX + delegationKey.getKeyId()); if (LOG.isDebugEnabled()) { LOG.debug("Storing RMDelegationKey_" + delegationKey.getKeyId()); } @@ -765,9 +798,11 @@ public class ZKRMStateStore extends RMStateStore { String nodeRemovePath = getNodePath(dtMasterKeysRootPath, DELEGATION_KEY_PREFIX + delegationKey.getKeyId()); + if (LOG.isDebugEnabled()) { LOG.debug("Removing RMDelegationKey_" + delegationKey.getKeyId()); } + safeDelete(nodeRemovePath); } @@ -789,30 +824,31 @@ public class ZKRMStateStore extends RMStateStore { } @Override - public synchronized void storeOrUpdateAMRMTokenSecretManagerState( + protected synchronized void storeOrUpdateAMRMTokenSecretManagerState( AMRMTokenSecretManagerState amrmTokenSecretManagerState, boolean isUpdate) throws Exception { AMRMTokenSecretManagerState data = AMRMTokenSecretManagerState.newInstance(amrmTokenSecretManagerState); byte[] stateData = data.getProto().toByteArray(); + safeSetData(amrmTokenSecretManagerRoot, stateData, -1); } @Override protected synchronized void removeReservationState(String planName, - String reservationIdName) - throws Exception { - String planNodePath = - getNodePath(reservationRoot, planName); - String reservationPath = getNodePath(planNodePath, - reservationIdName); + String reservationIdName) throws Exception { + String planNodePath = getNodePath(reservationRoot, planName); + String reservationPath = getNodePath(planNodePath, reservationIdName); + if (LOG.isDebugEnabled()) { - LOG.debug("Removing reservationallocation " + reservationIdName + " for" + - " plan " + planName); + LOG.debug("Removing reservationallocation " + reservationIdName + + " for" + " plan " + planName); } + safeDelete(reservationPath); List reservationNodes = getChildren(planNodePath); + if (reservationNodes.isEmpty()) { safeDelete(planNodePath); } @@ -821,11 +857,10 @@ public class ZKRMStateStore extends RMStateStore { @Override protected synchronized void storeReservationState( ReservationAllocationStateProto reservationAllocation, String planName, - String reservationIdName) - throws Exception { + String reservationIdName) throws Exception { SafeTransaction trx = new SafeTransaction(); - addOrUpdateReservationState( - reservationAllocation, planName, reservationIdName, trx, false); + addOrUpdateReservationState(reservationAllocation, planName, + reservationIdName, trx, false); trx.commit(); } @@ -843,6 +878,7 @@ public class ZKRMStateStore extends RMStateStore { if (LOG.isDebugEnabled()) { LOG.debug("Creating plan node: " + planName + " at: " + planCreatePath); } + trx.create(planCreatePath, null, zkAcl, CreateMode.PERSISTENT); } @@ -871,6 +907,7 @@ public class ZKRMStateStore extends RMStateStore { Preconditions.checkArgument(pathParts.length >= 1 && pathParts[0].isEmpty(), "Invalid path: %s", path); StringBuilder sb = new StringBuilder(); + for (int i = 1; i < pathParts.length; i++) { sb.append("/").append(pathParts[i]); create(sb.toString()); @@ -947,10 +984,9 @@ public class ZKRMStateStore extends RMStateStore { SafeTransaction() throws Exception { CuratorTransaction transaction = curatorFramework.inTransaction(); - transactionFinal = - transaction.create() - .withMode(CreateMode.PERSISTENT).withACL(zkAcl) - .forPath(fencingNodePath, new byte[0]).and(); + transactionFinal = transaction.create() + .withMode(CreateMode.PERSISTENT).withACL(zkAcl) + .forPath(fencingNodePath, new byte[0]).and(); } public void commit() throws Exception { @@ -985,19 +1021,17 @@ public class ZKRMStateStore extends RMStateStore { super(VerifyActiveStatusThread.class.getName()); } + @Override public void run() { try { - while (true) { - if(isFencedState()) { - break; - } + while (!isFencedState()) { // Create and delete fencing node new SafeTransaction().commit(); Thread.sleep(zkSessionTimeout); } } catch (InterruptedException ie) { - LOG.info(VerifyActiveStatusThread.class.getName() + " thread " + - "interrupted! Exiting!"); + LOG.info(getName() + " thread interrupted! Exiting!"); + interrupt(); } catch (Exception e) { notifyStoreOperationFailed(new StoreFencedException()); }