YARN-1307. Redesign znode structure for Zookeeper based RM state-store for better organization and scalability. Contributed by Tsuyoshi OZAWA.

svn merge --ignore-ancestry -c 1552209 ../../trunk/


git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/branch-2@1552210 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Vinod Kumar Vavilapalli 2013-12-19 02:33:33 +00:00
parent 8dbe8d2165
commit ba1286fec1
9 changed files with 277 additions and 156 deletions

View File

@ -166,6 +166,9 @@ Release 2.4.0 - UNRELEASED
YARN-1446. Changed client API to retry killing application till RM YARN-1446. Changed client API to retry killing application till RM
acknowledges so as to account for RM crashes/failover. (Jian He via vinodkv) acknowledges so as to account for RM crashes/failover. (Jian He via vinodkv)
YARN-1307. Redesign znode structure for Zookeeper based RM state-store for
better organization and scalability. (Tsuyoshi OZAWA via vinodkv)
OPTIMIZATIONS OPTIMIZATIONS
BUG FIXES BUG FIXES

View File

@ -287,11 +287,12 @@ public class FileSystemRMStateStore extends RMStateStore {
} }
@Override @Override
public synchronized void storeApplicationStateInternal(String appId, public synchronized void storeApplicationStateInternal(ApplicationId appId,
ApplicationStateDataPBImpl appStateDataPB) throws Exception { ApplicationStateDataPBImpl appStateDataPB) throws Exception {
Path appDirPath = getAppDir(rmAppRoot, appId); String appIdStr = appId.toString();
Path appDirPath = getAppDir(rmAppRoot, appIdStr);
fs.mkdirs(appDirPath); fs.mkdirs(appDirPath);
Path nodeCreatePath = getNodePath(appDirPath, appId); Path nodeCreatePath = getNodePath(appDirPath, appIdStr);
LOG.info("Storing info for app: " + appId + " at: " + nodeCreatePath); LOG.info("Storing info for app: " + appId + " at: " + nodeCreatePath);
byte[] appStateData = appStateDataPB.getProto().toByteArray(); byte[] appStateData = appStateDataPB.getProto().toByteArray();
@ -306,10 +307,11 @@ public class FileSystemRMStateStore extends RMStateStore {
} }
@Override @Override
public synchronized void updateApplicationStateInternal(String appId, public synchronized void updateApplicationStateInternal(ApplicationId appId,
ApplicationStateDataPBImpl appStateDataPB) throws Exception { ApplicationStateDataPBImpl appStateDataPB) throws Exception {
Path appDirPath = getAppDir(rmAppRoot, appId); String appIdStr = appId.toString();
Path nodeCreatePath = getNodePath(appDirPath, appId); Path appDirPath = getAppDir(rmAppRoot, appIdStr);
Path nodeCreatePath = getNodePath(appDirPath, appIdStr);
LOG.info("Updating info for app: " + appId + " at: " + nodeCreatePath); LOG.info("Updating info for app: " + appId + " at: " + nodeCreatePath);
byte[] appStateData = appStateDataPB.getProto().toByteArray(); byte[] appStateData = appStateDataPB.getProto().toByteArray();
@ -325,14 +327,13 @@ public class FileSystemRMStateStore extends RMStateStore {
@Override @Override
public synchronized void storeApplicationAttemptStateInternal( public synchronized void storeApplicationAttemptStateInternal(
String attemptId, ApplicationAttemptStateDataPBImpl attemptStateDataPB) ApplicationAttemptId appAttemptId,
ApplicationAttemptStateDataPBImpl attemptStateDataPB)
throws Exception { throws Exception {
ApplicationAttemptId appAttemptId =
ConverterUtils.toApplicationAttemptId(attemptId);
Path appDirPath = Path appDirPath =
getAppDir(rmAppRoot, appAttemptId.getApplicationId().toString()); getAppDir(rmAppRoot, appAttemptId.getApplicationId().toString());
Path nodeCreatePath = getNodePath(appDirPath, attemptId); Path nodeCreatePath = getNodePath(appDirPath, appAttemptId.toString());
LOG.info("Storing info for attempt: " + attemptId + " at: " LOG.info("Storing info for attempt: " + appAttemptId + " at: "
+ nodeCreatePath); + nodeCreatePath);
byte[] attemptStateData = attemptStateDataPB.getProto().toByteArray(); byte[] attemptStateData = attemptStateDataPB.getProto().toByteArray();
try { try {
@ -340,21 +341,20 @@ public class FileSystemRMStateStore extends RMStateStore {
// based on whether we have lost the right to write to FS // based on whether we have lost the right to write to FS
writeFile(nodeCreatePath, attemptStateData); writeFile(nodeCreatePath, attemptStateData);
} catch (Exception e) { } catch (Exception e) {
LOG.info("Error storing info for attempt: " + attemptId, e); LOG.info("Error storing info for attempt: " + appAttemptId, e);
throw e; throw e;
} }
} }
@Override @Override
public synchronized void updateApplicationAttemptStateInternal( public synchronized void updateApplicationAttemptStateInternal(
String attemptId, ApplicationAttemptStateDataPBImpl attemptStateDataPB) ApplicationAttemptId appAttemptId,
ApplicationAttemptStateDataPBImpl attemptStateDataPB)
throws Exception { throws Exception {
ApplicationAttemptId appAttemptId =
ConverterUtils.toApplicationAttemptId(attemptId);
Path appDirPath = Path appDirPath =
getAppDir(rmAppRoot, appAttemptId.getApplicationId().toString()); getAppDir(rmAppRoot, appAttemptId.getApplicationId().toString());
Path nodeCreatePath = getNodePath(appDirPath, attemptId); Path nodeCreatePath = getNodePath(appDirPath, appAttemptId.toString());
LOG.info("Updating info for attempt: " + attemptId + " at: " LOG.info("Updating info for attempt: " + appAttemptId + " at: "
+ nodeCreatePath); + nodeCreatePath);
byte[] attemptStateData = attemptStateDataPB.getProto().toByteArray(); byte[] attemptStateData = attemptStateDataPB.getProto().toByteArray();
try { try {
@ -362,7 +362,7 @@ public class FileSystemRMStateStore extends RMStateStore {
// based on whether we have lost the right to write to FS // based on whether we have lost the right to write to FS
updateFile(nodeCreatePath, attemptStateData); updateFile(nodeCreatePath, attemptStateData);
} catch (Exception e) { } catch (Exception e) {
LOG.info("Error updating info for attempt: " + attemptId, e); LOG.info("Error updating info for attempt: " + appAttemptId, e);
throw e; throw e;
} }
} }

View File

@ -80,7 +80,7 @@ public class MemoryRMStateStore extends RMStateStore {
} }
@Override @Override
public void storeApplicationStateInternal(String appId, public void storeApplicationStateInternal(ApplicationId appId,
ApplicationStateDataPBImpl appStateData) ApplicationStateDataPBImpl appStateData)
throws Exception { throws Exception {
ApplicationState appState = ApplicationState appState =
@ -88,11 +88,11 @@ public class MemoryRMStateStore extends RMStateStore {
appStateData.getStartTime(), appStateData.getStartTime(),
appStateData.getApplicationSubmissionContext(), appStateData.getApplicationSubmissionContext(),
appStateData.getUser()); appStateData.getUser());
state.appState.put(appState.getAppId(), appState); state.appState.put(appId, appState);
} }
@Override @Override
public void updateApplicationStateInternal(String appId, public void updateApplicationStateInternal(ApplicationId appId,
ApplicationStateDataPBImpl appStateData) throws Exception { ApplicationStateDataPBImpl appStateData) throws Exception {
ApplicationState updatedAppState = ApplicationState updatedAppState =
new ApplicationState(appStateData.getSubmitTime(), new ApplicationState(appStateData.getSubmitTime(),
@ -102,21 +102,19 @@ public class MemoryRMStateStore extends RMStateStore {
appStateData.getDiagnostics(), appStateData.getFinishTime()); appStateData.getDiagnostics(), appStateData.getFinishTime());
LOG.info("Updating final state " + appStateData.getState() + " for app: " LOG.info("Updating final state " + appStateData.getState() + " for app: "
+ appId); + appId);
ApplicationId applicationId = updatedAppState.getAppId(); if (state.appState.get(appId) != null) {
if (state.appState.get(applicationId) != null) {
// add the earlier attempts back // add the earlier attempts back
updatedAppState.attempts updatedAppState.attempts
.putAll(state.appState.get(applicationId).attempts); .putAll(state.appState.get(appId).attempts);
} }
state.appState.put(applicationId, updatedAppState); state.appState.put(appId, updatedAppState);
} }
@Override @Override
public synchronized void storeApplicationAttemptStateInternal(String attemptIdStr, public synchronized void storeApplicationAttemptStateInternal(
ApplicationAttemptStateDataPBImpl attemptStateData) ApplicationAttemptId appAttemptId,
throws Exception { ApplicationAttemptStateDataPBImpl attemptStateData)
ApplicationAttemptId attemptId = ConverterUtils throws Exception {
.toApplicationAttemptId(attemptIdStr);
Credentials credentials = null; Credentials credentials = null;
if(attemptStateData.getAppAttemptTokens() != null){ if(attemptStateData.getAppAttemptTokens() != null){
DataInputByteBuffer dibb = new DataInputByteBuffer(); DataInputByteBuffer dibb = new DataInputByteBuffer();
@ -125,7 +123,7 @@ public class MemoryRMStateStore extends RMStateStore {
credentials.readTokenStorageStream(dibb); credentials.readTokenStorageStream(dibb);
} }
ApplicationAttemptState attemptState = ApplicationAttemptState attemptState =
new ApplicationAttemptState(attemptId, new ApplicationAttemptState(appAttemptId,
attemptStateData.getMasterContainer(), credentials, attemptStateData.getMasterContainer(), credentials,
attemptStateData.getStartTime()); attemptStateData.getStartTime());
@ -139,10 +137,9 @@ public class MemoryRMStateStore extends RMStateStore {
@Override @Override
public synchronized void updateApplicationAttemptStateInternal( public synchronized void updateApplicationAttemptStateInternal(
String attemptIdStr, ApplicationAttemptStateDataPBImpl attemptStateData) ApplicationAttemptId appAttemptId,
ApplicationAttemptStateDataPBImpl attemptStateData)
throws Exception { throws Exception {
ApplicationAttemptId attemptId =
ConverterUtils.toApplicationAttemptId(attemptIdStr);
Credentials credentials = null; Credentials credentials = null;
if (attemptStateData.getAppAttemptTokens() != null) { if (attemptStateData.getAppAttemptTokens() != null) {
DataInputByteBuffer dibb = new DataInputByteBuffer(); DataInputByteBuffer dibb = new DataInputByteBuffer();
@ -151,7 +148,7 @@ public class MemoryRMStateStore extends RMStateStore {
credentials.readTokenStorageStream(dibb); credentials.readTokenStorageStream(dibb);
} }
ApplicationAttemptState updatedAttemptState = ApplicationAttemptState updatedAttemptState =
new ApplicationAttemptState(attemptId, new ApplicationAttemptState(appAttemptId,
attemptStateData.getMasterContainer(), credentials, attemptStateData.getMasterContainer(), credentials,
attemptStateData.getStartTime(), attemptStateData.getState(), attemptStateData.getStartTime(), attemptStateData.getState(),
attemptStateData.getFinalTrackingUrl(), attemptStateData.getFinalTrackingUrl(),

View File

@ -22,6 +22,8 @@ package org.apache.hadoop.yarn.server.resourcemanager.recovery;
import org.apache.hadoop.classification.InterfaceStability.Unstable; import org.apache.hadoop.classification.InterfaceStability.Unstable;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.security.token.delegation.DelegationKey; import org.apache.hadoop.security.token.delegation.DelegationKey;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.security.client.RMDelegationTokenIdentifier; import org.apache.hadoop.yarn.security.client.RMDelegationTokenIdentifier;
import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.RMStateVersion; import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.RMStateVersion;
import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.impl.pb.ApplicationAttemptStateDataPBImpl; import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.impl.pb.ApplicationAttemptStateDataPBImpl;
@ -51,13 +53,13 @@ public class NullRMStateStore extends RMStateStore {
} }
@Override @Override
protected void storeApplicationStateInternal(String appId, protected void storeApplicationStateInternal(ApplicationId appId,
ApplicationStateDataPBImpl appStateData) throws Exception { ApplicationStateDataPBImpl appStateData) throws Exception {
// Do nothing // Do nothing
} }
@Override @Override
protected void storeApplicationAttemptStateInternal(String attemptId, protected void storeApplicationAttemptStateInternal(ApplicationAttemptId attemptId,
ApplicationAttemptStateDataPBImpl attemptStateData) throws Exception { ApplicationAttemptStateDataPBImpl attemptStateData) throws Exception {
// Do nothing // Do nothing
} }
@ -92,13 +94,13 @@ public class NullRMStateStore extends RMStateStore {
} }
@Override @Override
protected void updateApplicationStateInternal(String appId, protected void updateApplicationStateInternal(ApplicationId appId,
ApplicationStateDataPBImpl appStateData) throws Exception { ApplicationStateDataPBImpl appStateData) throws Exception {
// Do nothing // Do nothing
} }
@Override @Override
protected void updateApplicationAttemptStateInternal(String attemptId, protected void updateApplicationAttemptStateInternal(ApplicationAttemptId attemptId,
ApplicationAttemptStateDataPBImpl attemptStateData) throws Exception { ApplicationAttemptStateDataPBImpl attemptStateData) throws Exception {
} }

View File

@ -387,10 +387,10 @@ public abstract class RMStateStore extends AbstractService {
* Derived classes must implement this method to store the state of an * Derived classes must implement this method to store the state of an
* application. * application.
*/ */
protected abstract void storeApplicationStateInternal(String appId, protected abstract void storeApplicationStateInternal(ApplicationId appId,
ApplicationStateDataPBImpl appStateData) throws Exception; ApplicationStateDataPBImpl appStateData) throws Exception;
protected abstract void updateApplicationStateInternal(String appId, protected abstract void updateApplicationStateInternal(ApplicationId appId,
ApplicationStateDataPBImpl appStateData) throws Exception; ApplicationStateDataPBImpl appStateData) throws Exception;
@SuppressWarnings("unchecked") @SuppressWarnings("unchecked")
@ -424,10 +424,12 @@ public abstract class RMStateStore extends AbstractService {
* Derived classes must implement this method to store the state of an * Derived classes must implement this method to store the state of an
* application attempt * application attempt
*/ */
protected abstract void storeApplicationAttemptStateInternal(String attemptId, protected abstract void storeApplicationAttemptStateInternal(
ApplicationAttemptId attemptId,
ApplicationAttemptStateDataPBImpl attemptStateData) throws Exception; ApplicationAttemptStateDataPBImpl attemptStateData) throws Exception;
protected abstract void updateApplicationAttemptStateInternal(String attemptId, protected abstract void updateApplicationAttemptStateInternal(
ApplicationAttemptId attemptId,
ApplicationAttemptStateDataPBImpl attemptStateData) throws Exception; ApplicationAttemptStateDataPBImpl attemptStateData) throws Exception;
/** /**
@ -592,11 +594,11 @@ public abstract class RMStateStore extends AbstractService {
LOG.info("Storing info for app: " + appId); LOG.info("Storing info for app: " + appId);
try { try {
if (event.getType().equals(RMStateStoreEventType.STORE_APP)) { if (event.getType().equals(RMStateStoreEventType.STORE_APP)) {
storeApplicationStateInternal(appId.toString(), appStateData); storeApplicationStateInternal(appId, appStateData);
notifyDoneStoringApplication(appId, storedException); notifyDoneStoringApplication(appId, storedException);
} else { } else {
assert event.getType().equals(RMStateStoreEventType.UPDATE_APP); assert event.getType().equals(RMStateStoreEventType.UPDATE_APP);
updateApplicationStateInternal(appId.toString(), appStateData); updateApplicationStateInternal(appId, appStateData);
notifyDoneUpdatingApplication(appId, storedException); notifyDoneUpdatingApplication(appId, storedException);
} }
} catch (Exception e) { } catch (Exception e) {
@ -637,15 +639,15 @@ public abstract class RMStateStore extends AbstractService {
LOG.debug("Storing info for attempt: " + attemptState.getAttemptId()); LOG.debug("Storing info for attempt: " + attemptState.getAttemptId());
} }
if (event.getType().equals(RMStateStoreEventType.STORE_APP_ATTEMPT)) { if (event.getType().equals(RMStateStoreEventType.STORE_APP_ATTEMPT)) {
storeApplicationAttemptStateInternal(attemptState.getAttemptId() storeApplicationAttemptStateInternal(attemptState.getAttemptId(),
.toString(), attemptStateData); attemptStateData);
notifyDoneStoringApplicationAttempt(attemptState.getAttemptId(), notifyDoneStoringApplicationAttempt(attemptState.getAttemptId(),
storedException); storedException);
} else { } else {
assert event.getType().equals( assert event.getType().equals(
RMStateStoreEventType.UPDATE_APP_ATTEMPT); RMStateStoreEventType.UPDATE_APP_ATTEMPT);
updateApplicationAttemptStateInternal(attemptState.getAttemptId() updateApplicationAttemptStateInternal(attemptState.getAttemptId(),
.toString(), attemptStateData); attemptStateData);
notifyDoneUpdatingApplicationAttempt(attemptState.getAttemptId(), notifyDoneUpdatingApplicationAttempt(attemptState.getAttemptId(),
storedException); storedException);
} }

View File

@ -78,16 +78,51 @@ public class ZKRMStateStore extends RMStateStore {
protected static final String ROOT_ZNODE_NAME = "ZKRMStateRoot"; protected static final String ROOT_ZNODE_NAME = "ZKRMStateRoot";
protected static final RMStateVersion CURRENT_VERSION_INFO = RMStateVersion protected static final RMStateVersion CURRENT_VERSION_INFO = RMStateVersion
.newInstance(1, 0); .newInstance(1, 0);
private static final String RM_DELEGATION_TOKENS_ROOT_ZNODE_NAME =
"RMDelegationTokensRoot";
private static final String RM_DT_SEQUENTIAL_NUMBER_ZNODE_NAME =
"RMDTSequentialNumber";
private static final String RM_DT_MASTER_KEYS_ROOT_ZNODE_NAME =
"RMDTMasterKeysRoot";
private int numRetries; private int numRetries;
private String zkHostPort = null; private String zkHostPort = null;
private int zkSessionTimeout; private int zkSessionTimeout;
private long zkRetryInterval; private long zkRetryInterval;
private List<ACL> zkAcl; private List<ACL> zkAcl;
/**
*
* ROOT_DIR_PATH
* |--- VERSION_INFO
* |--- RM_ZK_FENCING_LOCK
* |--- RM_APP_ROOT
* | |----- (#ApplicationId1)
* | | |----- (#ApplicationAttemptIds)
* | |
* | |----- (#ApplicationId2)
* | | |----- (#ApplicationAttemptIds)
* | ....
* |
* |--- RM_DT_SECRET_MANAGER_ROOT
* |----- RM_DT_SEQUENTIAL_NUMBER_ZNODE_NAME
* |----- RM_DELEGATION_TOKENS_ROOT_ZNODE_NAME
* | |----- Token_1
* | |----- Token_2
* | ....
* |
* |----- RM_DT_MASTER_KEYS_ROOT_ZNODE_NAME
* | |----- Key_1
* | |----- Key_2
* ....
*
*/
private String zkRootNodePath; private String zkRootNodePath;
private String rmDTSecretManagerRoot;
private String rmAppRoot; private String rmAppRoot;
private String dtSequenceNumberPath = null; private String rmDTSecretManagerRoot;
private String dtMasterKeysRootPath;
private String delegationTokensRootPath;
private String dtSequenceNumberPath;
@VisibleForTesting @VisibleForTesting
protected String znodeWorkingPath; protected String znodeWorkingPath;
@ -178,12 +213,11 @@ public class ZKRMStateStore extends RMStateStore {
throw bafe; throw bafe;
} }
zkRootNodePath = znodeWorkingPath + "/" + ROOT_ZNODE_NAME; zkRootNodePath = getNodePath(znodeWorkingPath, ROOT_ZNODE_NAME);
rmDTSecretManagerRoot = zkRootNodePath + "/" + RM_DT_SECRET_MANAGER_ROOT; rmAppRoot = getNodePath(zkRootNodePath, RM_APP_ROOT);
rmAppRoot = zkRootNodePath + "/" + RM_APP_ROOT;
/* Initialize fencing related paths, acls, and ops */ /* Initialize fencing related paths, acls, and ops */
fencingNodePath = zkRootNodePath + "/" + FENCING_LOCK; fencingNodePath = getNodePath(zkRootNodePath, FENCING_LOCK);
createFencingNodePathOp = Op.create(fencingNodePath, new byte[0], zkAcl, createFencingNodePathOp = Op.create(fencingNodePath, new byte[0], zkAcl,
CreateMode.PERSISTENT); CreateMode.PERSISTENT);
deleteFencingNodePathOp = Op.delete(fencingNodePath, -1); deleteFencingNodePathOp = Op.delete(fencingNodePath, -1);
@ -204,6 +238,15 @@ public class ZKRMStateStore extends RMStateStore {
zkRootNodeAcl = constructZkRootNodeACL(conf, zkAcl); zkRootNodeAcl = constructZkRootNodeACL(conf, zkAcl);
} }
} }
rmDTSecretManagerRoot =
getNodePath(zkRootNodePath, RM_DT_SECRET_MANAGER_ROOT);
dtMasterKeysRootPath = getNodePath(rmDTSecretManagerRoot,
RM_DT_MASTER_KEYS_ROOT_ZNODE_NAME);
delegationTokensRootPath = getNodePath(rmDTSecretManagerRoot,
RM_DELEGATION_TOKENS_ROOT_ZNODE_NAME);
dtSequenceNumberPath = getNodePath(rmDTSecretManagerRoot,
RM_DT_SEQUENTIAL_NUMBER_ZNODE_NAME);
} }
@Override @Override
@ -217,8 +260,11 @@ public class ZKRMStateStore extends RMStateStore {
if (HAUtil.isHAEnabled(getConfig())){ if (HAUtil.isHAEnabled(getConfig())){
fence(); fence();
} }
createRootDir(rmDTSecretManagerRoot);
createRootDir(rmAppRoot); createRootDir(rmAppRoot);
createRootDir(rmDTSecretManagerRoot);
createRootDir(dtMasterKeysRootPath);
createRootDir(delegationTokensRootPath);
createRootDir(dtSequenceNumberPath);
} }
private void createRootDir(final String rootPath) throws Exception { private void createRootDir(final String rootPath) throws Exception {
@ -350,26 +396,69 @@ public class ZKRMStateStore extends RMStateStore {
private synchronized void loadRMDTSecretManagerState(RMState rmState) private synchronized void loadRMDTSecretManagerState(RMState rmState)
throws Exception { throws Exception {
List<String> childNodes = loadRMDelegationKeyState(rmState);
getChildrenWithRetries(rmDTSecretManagerRoot, true); loadRMSequentialNumberState(rmState);
loadRMDelegationTokenState(rmState);
}
private void loadRMDelegationKeyState(RMState rmState) throws Exception {
List<String> childNodes =
getChildrenWithRetries(dtMasterKeysRootPath, true);
for (String childNodeName : childNodes) { for (String childNodeName : childNodes) {
if (childNodeName.startsWith(DELEGATION_TOKEN_SEQUENCE_NUMBER_PREFIX)) { String childNodePath = getNodePath(dtMasterKeysRootPath, childNodeName);
rmState.rmSecretManagerState.dtSequenceNumber = byte[] childData = getDataWithRetries(childNodePath, true);
Integer.parseInt(childNodeName.split("_")[1]);
if (childData == null) {
LOG.warn("Content of " + childNodePath + " is broken.");
continue; continue;
} }
String childNodePath = getNodePath(rmDTSecretManagerRoot, childNodeName);
byte[] childData = getDataWithRetries(childNodePath, true);
ByteArrayInputStream is = new ByteArrayInputStream(childData); ByteArrayInputStream is = new ByteArrayInputStream(childData);
DataInputStream fsIn = new DataInputStream(is); DataInputStream fsIn = new DataInputStream(is);
try { try {
if (childNodeName.startsWith(DELEGATION_KEY_PREFIX)) { if (childNodeName.startsWith(DELEGATION_KEY_PREFIX)) {
DelegationKey key = new DelegationKey(); DelegationKey key = new DelegationKey();
key.readFields(fsIn); key.readFields(fsIn);
rmState.rmSecretManagerState.masterKeyState.add(key); rmState.rmSecretManagerState.masterKeyState.add(key);
} else if (childNodeName.startsWith(DELEGATION_TOKEN_PREFIX)) { }
} finally {
is.close();
}
}
}
private void loadRMSequentialNumberState(RMState rmState) throws Exception {
byte[] seqData = getDataWithRetries(dtSequenceNumberPath, false);
if (seqData != null) {
ByteArrayInputStream seqIs = new ByteArrayInputStream(seqData);
DataInputStream seqIn = new DataInputStream(seqIs);
try {
rmState.rmSecretManagerState.dtSequenceNumber = seqIn.readInt();
} finally {
seqIn.close();
}
}
}
private void loadRMDelegationTokenState(RMState rmState) throws Exception {
List<String> childNodes = zkClient.getChildren(delegationTokensRootPath, true);
for (String childNodeName : childNodes) {
String childNodePath =
getNodePath(delegationTokensRootPath, childNodeName);
byte[] childData = getDataWithRetries(childNodePath, true);
if (childData == null) {
LOG.warn("Content of " + childNodePath + " is broken.");
continue;
}
ByteArrayInputStream is = new ByteArrayInputStream(childData);
DataInputStream fsIn = new DataInputStream(is);
try {
if (childNodeName.startsWith(DELEGATION_TOKEN_PREFIX)) {
RMDelegationTokenIdentifier identifier = RMDelegationTokenIdentifier identifier =
new RMDelegationTokenIdentifier(); new RMDelegationTokenIdentifier();
identifier.readFields(fsIn); identifier.readFields(fsIn);
@ -385,8 +474,6 @@ public class ZKRMStateStore extends RMStateStore {
private synchronized void loadRMAppState(RMState rmState) throws Exception { private synchronized void loadRMAppState(RMState rmState) throws Exception {
List<String> childNodes = getChildrenWithRetries(rmAppRoot, true); List<String> childNodes = getChildrenWithRetries(rmAppRoot, true);
List<ApplicationAttemptState> attempts =
new ArrayList<ApplicationAttemptState>();
for (String childNodeName : childNodes) { for (String childNodeName : childNodes) {
String childNodePath = getNodePath(rmAppRoot, childNodeName); String childNodePath = getNodePath(rmAppRoot, childNodeName);
byte[] childData = getDataWithRetries(childNodePath, true); byte[] childData = getDataWithRetries(childNodePath, true);
@ -411,17 +498,28 @@ public class ZKRMStateStore extends RMStateStore {
"from the application id"); "from the application id");
} }
rmState.appState.put(appId, appState); rmState.appState.put(appId, appState);
} else if (childNodeName loadApplicationAttemptState(appState, appId);
.startsWith(ApplicationAttemptId.appAttemptIdStrPrefix)) { } else {
// attempt LOG.info("Unknown child node with name: " + childNodeName);
if (LOG.isDebugEnabled()) { }
LOG.debug("Loading application attempt from znode: " + childNodeName); }
} }
private void loadApplicationAttemptState(ApplicationState appState,
ApplicationId appId)
throws Exception {
String appPath = getNodePath(rmAppRoot, appId.toString());
List<String> attempts = getChildrenWithRetries(appPath, false);
for (String attemptIDStr : attempts) {
if (attemptIDStr.startsWith(ApplicationAttemptId.appAttemptIdStrPrefix)) {
String attemptPath = getNodePath(appPath, attemptIDStr);
byte[] attemptData = getDataWithRetries(attemptPath, true);
ApplicationAttemptId attemptId = ApplicationAttemptId attemptId =
ConverterUtils.toApplicationAttemptId(childNodeName); ConverterUtils.toApplicationAttemptId(attemptIDStr);
ApplicationAttemptStateDataPBImpl attemptStateData = ApplicationAttemptStateDataPBImpl attemptStateData =
new ApplicationAttemptStateDataPBImpl( new ApplicationAttemptStateDataPBImpl(
ApplicationAttemptStateDataProto.parseFrom(childData)); ApplicationAttemptStateDataProto.parseFrom(attemptData));
Credentials credentials = null; Credentials credentials = null;
if (attemptStateData.getAppAttemptTokens() != null) { if (attemptStateData.getAppAttemptTokens() != null) {
credentials = new Credentials(); credentials = new Credentials();
@ -429,47 +527,26 @@ public class ZKRMStateStore extends RMStateStore {
dibb.reset(attemptStateData.getAppAttemptTokens()); dibb.reset(attemptStateData.getAppAttemptTokens());
credentials.readTokenStorageStream(dibb); credentials.readTokenStorageStream(dibb);
} }
ApplicationAttemptState attemptState = ApplicationAttemptState attemptState =
new ApplicationAttemptState(attemptId, new ApplicationAttemptState(attemptId,
attemptStateData.getMasterContainer(), credentials, attemptStateData.getMasterContainer(), credentials,
attemptStateData.getStartTime(), attemptStateData.getStartTime(),
attemptStateData.getState(), attemptStateData.getState(),
attemptStateData.getFinalTrackingUrl(), attemptStateData.getFinalTrackingUrl(),
attemptStateData.getDiagnostics(), attemptStateData.getDiagnostics(),
attemptStateData.getFinalApplicationStatus()); attemptStateData.getFinalApplicationStatus());
if (!attemptId.equals(attemptState.getAttemptId())) {
throw new YarnRuntimeException("The child node name is different " +
"from the application attempt id");
}
attempts.add(attemptState);
} else {
LOG.info("Unknown child node with name: " + childNodeName);
}
}
// go through all attempts and add them to their apps
for (ApplicationAttemptState attemptState : attempts) {
ApplicationId appId = attemptState.getAttemptId().getApplicationId();
ApplicationState appState = rmState.appState.get(appId);
if (appState != null) {
appState.attempts.put(attemptState.getAttemptId(), attemptState); appState.attempts.put(attemptState.getAttemptId(), attemptState);
} else {
// the application znode may have been removed when the application
// completed but the RM might have stopped before it could remove the
// application attempt znodes
LOG.info("Application node not found for attempt: "
+ attemptState.getAttemptId());
deleteWithRetries(
getNodePath(rmAppRoot, attemptState.getAttemptId().toString()), -1);
} }
} }
LOG.info("Done Loading applications from ZK state store"); LOG.info("Done Loading applications from ZK state store");
} }
@Override @Override
public synchronized void storeApplicationStateInternal(String appId, public synchronized void storeApplicationStateInternal(ApplicationId appId,
ApplicationStateDataPBImpl appStateDataPB) throws Exception { ApplicationStateDataPBImpl appStateDataPB) throws Exception {
String nodeCreatePath = getNodePath(rmAppRoot, appId); String nodeCreatePath = getNodePath(rmAppRoot, appId.toString());
if (LOG.isDebugEnabled()) { if (LOG.isDebugEnabled()) {
LOG.debug("Storing info for app: " + appId + " at: " + nodeCreatePath); LOG.debug("Storing info for app: " + appId + " at: " + nodeCreatePath);
@ -481,25 +558,29 @@ public class ZKRMStateStore extends RMStateStore {
} }
@Override @Override
public synchronized void updateApplicationStateInternal(String appId, public synchronized void updateApplicationStateInternal(ApplicationId appId,
ApplicationStateDataPBImpl appStateDataPB) throws Exception { ApplicationStateDataPBImpl appStateDataPB) throws Exception {
String nodeCreatePath = getNodePath(rmAppRoot, appId); String nodeUpdatePath = getNodePath(rmAppRoot, appId.toString());
if (LOG.isDebugEnabled()) { if (LOG.isDebugEnabled()) {
LOG.debug("Storing final state info for app: " + appId + " at: " LOG.debug("Storing final state info for app: " + appId + " at: "
+ nodeCreatePath); + nodeUpdatePath);
} }
byte[] appStateData = appStateDataPB.getProto().toByteArray(); byte[] appStateData = appStateDataPB.getProto().toByteArray();
setDataWithRetries(nodeCreatePath, appStateData, 0); setDataWithRetries(nodeUpdatePath, appStateData, 0);
} }
@Override @Override
public synchronized void storeApplicationAttemptStateInternal( public synchronized void storeApplicationAttemptStateInternal(
String attemptId, ApplicationAttemptStateDataPBImpl attemptStateDataPB) ApplicationAttemptId appAttemptId,
ApplicationAttemptStateDataPBImpl attemptStateDataPB)
throws Exception { throws Exception {
String nodeCreatePath = getNodePath(rmAppRoot, attemptId); String appDirPath = getNodePath(rmAppRoot,
appAttemptId.getApplicationId().toString());
String nodeCreatePath = getNodePath(appDirPath, appAttemptId.toString());
if (LOG.isDebugEnabled()) { if (LOG.isDebugEnabled()) {
LOG.debug("Storing info for attempt: " + attemptId + " at: " LOG.debug("Storing info for attempt: " + appAttemptId + " at: "
+ nodeCreatePath); + nodeCreatePath);
} }
byte[] attemptStateData = attemptStateDataPB.getProto().toByteArray(); byte[] attemptStateData = attemptStateDataPB.getProto().toByteArray();
@ -509,31 +590,36 @@ public class ZKRMStateStore extends RMStateStore {
@Override @Override
public synchronized void updateApplicationAttemptStateInternal( public synchronized void updateApplicationAttemptStateInternal(
String attemptId, ApplicationAttemptStateDataPBImpl attemptStateDataPB) ApplicationAttemptId appAttemptId,
ApplicationAttemptStateDataPBImpl attemptStateDataPB)
throws Exception { throws Exception {
String nodeCreatePath = getNodePath(rmAppRoot, attemptId); String appIdStr = appAttemptId.getApplicationId().toString();
String appAttemptIdStr = appAttemptId.toString();
String appDirPath = getNodePath(rmAppRoot, appIdStr);
String nodeUpdatePath = getNodePath(appDirPath, appAttemptIdStr);
if (LOG.isDebugEnabled()) { if (LOG.isDebugEnabled()) {
LOG.debug("Storing final state info for attempt: " + attemptId + " at: " LOG.debug("Storing final state info for attempt: " + appAttemptIdStr
+ nodeCreatePath); + " at: " + nodeUpdatePath);
} }
byte[] attemptStateData = attemptStateDataPB.getProto().toByteArray(); byte[] attemptStateData = attemptStateDataPB.getProto().toByteArray();
setDataWithRetries(nodeCreatePath, attemptStateData, 0); setDataWithRetries(nodeUpdatePath, attemptStateData, 0);
} }
@Override @Override
public synchronized void removeApplicationStateInternal(ApplicationState appState) public synchronized void removeApplicationStateInternal(ApplicationState appState)
throws Exception { throws Exception {
String appId = appState.getAppId().toString(); String appId = appState.getAppId().toString();
String nodeRemovePath = getNodePath(rmAppRoot, appId); String appIdRemovePath = getNodePath(rmAppRoot, appId);
ArrayList<Op> opList = new ArrayList<Op>(); ArrayList<Op> opList = new ArrayList<Op>();
opList.add(Op.delete(nodeRemovePath, -1));
for (ApplicationAttemptId attemptId : appState.attempts.keySet()) { for (ApplicationAttemptId attemptId : appState.attempts.keySet()) {
String attemptRemovePath = getNodePath(rmAppRoot, attemptId.toString()); String attemptRemovePath = getNodePath(appIdRemovePath, attemptId.toString());
opList.add(Op.delete(attemptRemovePath, -1)); opList.add(Op.delete(attemptRemovePath, -1));
} }
opList.add(Op.delete(appIdRemovePath, -1));
if (LOG.isDebugEnabled()) { if (LOG.isDebugEnabled()) {
LOG.debug("Removing info for app: " + appId + " at: " + nodeRemovePath LOG.debug("Removing info for app: " + appId + " at: " + appIdRemovePath
+ " and its attempts."); + " and its attempts.");
} }
doMultiWithRetries(opList); doMultiWithRetries(opList);
@ -546,38 +632,37 @@ public class ZKRMStateStore extends RMStateStore {
ArrayList<Op> opList = new ArrayList<Op>(); ArrayList<Op> opList = new ArrayList<Op>();
// store RM delegation token // store RM delegation token
String nodeCreatePath = String nodeCreatePath =
getNodePath(rmDTSecretManagerRoot, DELEGATION_TOKEN_PREFIX getNodePath(delegationTokensRootPath, DELEGATION_TOKEN_PREFIX
+ rmDTIdentifier.getSequenceNumber()); + rmDTIdentifier.getSequenceNumber());
ByteArrayOutputStream os = new ByteArrayOutputStream(); ByteArrayOutputStream tokenOs = new ByteArrayOutputStream();
DataOutputStream fsOut = new DataOutputStream(os); DataOutputStream tokenOut = new DataOutputStream(tokenOs);
ByteArrayOutputStream seqOs = new ByteArrayOutputStream();
DataOutputStream seqOut = new DataOutputStream(seqOs);
try { try {
rmDTIdentifier.write(fsOut); rmDTIdentifier.write(tokenOut);
fsOut.writeLong(renewDate); tokenOut.writeLong(renewDate);
if (LOG.isDebugEnabled()) { if (LOG.isDebugEnabled()) {
LOG.debug("Storing RMDelegationToken_" + LOG.debug("Storing RMDelegationToken_" +
rmDTIdentifier.getSequenceNumber()); rmDTIdentifier.getSequenceNumber());
} }
opList.add(Op.create(nodeCreatePath, os.toByteArray(), zkAcl,
opList.add(Op.create(nodeCreatePath, tokenOs.toByteArray(), zkAcl,
CreateMode.PERSISTENT)); CreateMode.PERSISTENT));
seqOut.writeInt(latestSequenceNumber);
if (LOG.isDebugEnabled()) {
LOG.debug("Storing " + dtSequenceNumberPath +
". SequenceNumber: " + latestSequenceNumber);
}
opList.add(Op.setData(dtSequenceNumberPath, seqOs.toByteArray(), -1));
} finally { } finally {
os.close(); tokenOs.close();
seqOs.close();
} }
// store sequence number
String latestSequenceNumberPath =
getNodePath(rmDTSecretManagerRoot,
DELEGATION_TOKEN_SEQUENCE_NUMBER_PREFIX + latestSequenceNumber);
if (LOG.isDebugEnabled()) {
LOG.debug("Storing " + DELEGATION_TOKEN_SEQUENCE_NUMBER_PREFIX +
latestSequenceNumber);
}
if (dtSequenceNumberPath != null) {
opList.add(Op.delete(dtSequenceNumberPath, -1));
}
opList.add(Op.create(latestSequenceNumberPath, null, zkAcl,
CreateMode.PERSISTENT));
dtSequenceNumberPath = latestSequenceNumberPath;
doMultiWithRetries(opList); doMultiWithRetries(opList);
} }
@ -585,7 +670,7 @@ public class ZKRMStateStore extends RMStateStore {
protected synchronized void removeRMDelegationTokenState( protected synchronized void removeRMDelegationTokenState(
RMDelegationTokenIdentifier rmDTIdentifier) throws Exception { RMDelegationTokenIdentifier rmDTIdentifier) throws Exception {
String nodeRemovePath = String nodeRemovePath =
getNodePath(rmDTSecretManagerRoot, DELEGATION_TOKEN_PREFIX getNodePath(delegationTokensRootPath, DELEGATION_TOKEN_PREFIX
+ rmDTIdentifier.getSequenceNumber()); + rmDTIdentifier.getSequenceNumber());
if (LOG.isDebugEnabled()) { if (LOG.isDebugEnabled()) {
LOG.debug("Removing RMDelegationToken_" LOG.debug("Removing RMDelegationToken_"
@ -598,7 +683,7 @@ public class ZKRMStateStore extends RMStateStore {
protected synchronized void storeRMDTMasterKeyState( protected synchronized void storeRMDTMasterKeyState(
DelegationKey delegationKey) throws Exception { DelegationKey delegationKey) throws Exception {
String nodeCreatePath = String nodeCreatePath =
getNodePath(rmDTSecretManagerRoot, DELEGATION_KEY_PREFIX getNodePath(dtMasterKeysRootPath, DELEGATION_KEY_PREFIX
+ delegationKey.getKeyId()); + delegationKey.getKeyId());
ByteArrayOutputStream os = new ByteArrayOutputStream(); ByteArrayOutputStream os = new ByteArrayOutputStream();
DataOutputStream fsOut = new DataOutputStream(os); DataOutputStream fsOut = new DataOutputStream(os);
@ -618,7 +703,7 @@ public class ZKRMStateStore extends RMStateStore {
protected synchronized void removeRMDTMasterKeyState( protected synchronized void removeRMDTMasterKeyState(
DelegationKey delegationKey) throws Exception { DelegationKey delegationKey) throws Exception {
String nodeRemovePath = String nodeRemovePath =
getNodePath(rmDTSecretManagerRoot, DELEGATION_KEY_PREFIX getNodePath(dtMasterKeysRootPath, DELEGATION_KEY_PREFIX
+ delegationKey.getKeyId()); + delegationKey.getKeyId());
if (LOG.isDebugEnabled()) { if (LOG.isDebugEnabled()) {
LOG.debug("Removing RMDelegationKey_" + delegationKey.getKeyId()); LOG.debug("Removing RMDelegationKey_" + delegationKey.getKeyId());
@ -757,8 +842,7 @@ public class ZKRMStateStore extends RMStateStore {
return new ZKAction<byte[]>() { return new ZKAction<byte[]>() {
@Override @Override
public byte[] run() throws KeeperException, InterruptedException { public byte[] run() throws KeeperException, InterruptedException {
Stat stat = new Stat(); return zkClient.getData(path, watch, null);
return zkClient.getData(path, watch, stat);
} }
}.runWithRetries(); }.runWithRetries();
} }
@ -865,4 +949,5 @@ public class ZKRMStateStore extends RMStateStore {
zk.register(new ForwardingWatcher()); zk.register(new ForwardingWatcher());
return zk; return zk;
} }
} }

View File

@ -683,14 +683,14 @@ public class TestRMRestart {
MemoryRMStateStore memStore = new MemoryRMStateStore() { MemoryRMStateStore memStore = new MemoryRMStateStore() {
@Override @Override
public synchronized void storeApplicationAttemptStateInternal( public synchronized void storeApplicationAttemptStateInternal(
String attemptIdStr, ApplicationAttemptId attemptId,
ApplicationAttemptStateDataPBImpl attemptStateData) throws Exception { ApplicationAttemptStateDataPBImpl attemptStateData) throws Exception {
// ignore attempt saving request. // ignore attempt saving request.
} }
@Override @Override
public synchronized void updateApplicationAttemptStateInternal( public synchronized void updateApplicationAttemptStateInternal(
String attemptIdStr, ApplicationAttemptId attemptId,
ApplicationAttemptStateDataPBImpl attemptStateData) throws Exception { ApplicationAttemptStateDataPBImpl attemptStateData) throws Exception {
// ignore attempt saving request. // ignore attempt saving request.
} }
@ -1540,7 +1540,7 @@ public class TestRMRestart {
public int updateAttempt = 0; public int updateAttempt = 0;
@Override @Override
public void updateApplicationStateInternal(String appId, public void updateApplicationStateInternal(ApplicationId appId,
ApplicationStateDataPBImpl appStateData) throws Exception { ApplicationStateDataPBImpl appStateData) throws Exception {
updateApp = ++count; updateApp = ++count;
super.updateApplicationStateInternal(appId, appStateData); super.updateApplicationStateInternal(appId, appStateData);
@ -1548,11 +1548,12 @@ public class TestRMRestart {
@Override @Override
public synchronized void public synchronized void
updateApplicationAttemptStateInternal(String attemptIdStr, updateApplicationAttemptStateInternal(
ApplicationAttemptId attemptId,
ApplicationAttemptStateDataPBImpl attemptStateData) ApplicationAttemptStateDataPBImpl attemptStateData)
throws Exception { throws Exception {
updateAttempt = ++count; updateAttempt = ++count;
super.updateApplicationAttemptStateInternal(attemptIdStr, super.updateApplicationAttemptStateInternal(attemptId,
attemptStateData); attemptStateData);
} }
} }

View File

@ -234,6 +234,12 @@ public class RMStateStoreTestBase extends ClientBaseWithFixes{
attempts.put(attemptIdRemoved, mockRemovedAttempt); attempts.put(attemptIdRemoved, mockRemovedAttempt);
store.removeApplication(mockRemovedApp); store.removeApplication(mockRemovedApp);
// remove application directory recursively.
storeApp(store, appIdRemoved, submitTime, startTime);
storeAttempt(store, attemptIdRemoved,
"container_1352994193343_0002_01_000001", null, null, dispatcher);
store.removeApplication(mockRemovedApp);
// let things settle down // let things settle down
Thread.sleep(1000); Thread.sleep(1000);
store.close(); store.close();
@ -373,7 +379,30 @@ public class RMStateStoreTestBase extends ClientBaseWithFixes{
Assert.assertEquals(keySet, secretManagerState.getMasterKeyState()); Assert.assertEquals(keySet, secretManagerState.getMasterKeyState());
Assert.assertEquals(sequenceNumber, Assert.assertEquals(sequenceNumber,
secretManagerState.getDTSequenceNumber()); secretManagerState.getDTSequenceNumber());
// check to delete delegationKey
store.removeRMDTMasterKey(key);
keySet.clear();
RMDTSecretManagerState noKeySecretManagerState =
store.loadState().getRMDTSecretManagerState();
Assert.assertEquals(token1, noKeySecretManagerState.getTokenState());
Assert.assertEquals(keySet, noKeySecretManagerState.getMasterKeyState());
Assert.assertEquals(sequenceNumber,
noKeySecretManagerState.getDTSequenceNumber());
// check to delete delegationToken
store.removeRMDelegationToken(dtId1, sequenceNumber);
RMDTSecretManagerState noKeyAndTokenSecretManagerState =
store.loadState().getRMDTSecretManagerState();
token1.clear();
Assert.assertEquals(token1,
noKeyAndTokenSecretManagerState.getTokenState());
Assert.assertEquals(keySet,
noKeyAndTokenSecretManagerState.getMasterKeyState());
Assert.assertEquals(sequenceNumber,
noKeySecretManagerState.getDTSequenceNumber());
store.close(); store.close();
} }
private Token<AMRMTokenIdentifier> generateAMRMToken( private Token<AMRMTokenIdentifier> generateAMRMToken(

View File

@ -35,6 +35,7 @@ import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hdfs.HdfsConfiguration; import org.apache.hadoop.hdfs.HdfsConfiguration;
import org.apache.hadoop.hdfs.MiniDFSCluster; import org.apache.hadoop.hdfs.MiniDFSCluster;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.RMStateVersion; import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.RMStateVersion;
import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.impl.pb.ApplicationStateDataPBImpl; import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.impl.pb.ApplicationStateDataPBImpl;
@ -178,10 +179,11 @@ public class TestFSRMStateStore extends RMStateStoreTestBase {
@Override @Override
public void run() { public void run() {
try { try {
store.storeApplicationStateInternal("application1", store.storeApplicationStateInternal(
(ApplicationStateDataPBImpl) ApplicationStateDataPBImpl ApplicationId.newInstance(100L, 1),
.newApplicationStateData(111, 111, "user", null, (ApplicationStateDataPBImpl) ApplicationStateDataPBImpl
RMAppState.ACCEPTED, "diagnostics", 333)); .newApplicationStateData(111, 111, "user", null,
RMAppState.ACCEPTED, "diagnostics", 333));
} catch (Exception e) { } catch (Exception e) {
// TODO 0 datanode exception will not be retried by dfs client, fix // TODO 0 datanode exception will not be retried by dfs client, fix
// that separately. // that separately.