Merge r1504261 from trunk to branch-2 for YARN-922. Change FileSystemRMStateStore to use directories (Jian He via bikas)

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/branch-2@1504264 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Bikas Saha 2013-07-17 20:23:55 +00:00
parent 2dba05af50
commit 859b53435f
3 changed files with 76 additions and 92 deletions

View File

@ -483,6 +483,9 @@ Release 2.1.0-beta - 2013-07-02
YARN-927. Change ContainerRequest to not have more than 1 container count YARN-927. Change ContainerRequest to not have more than 1 container count
and remove StoreContainerRequest (bikas) and remove StoreContainerRequest (bikas)
YARN-922. Change FileSystemRMStateStore to use directories (Jian He via
bikas)
OPTIMIZATIONS OPTIMIZATIONS
YARN-512. Log aggregation root directory check is more expensive than it YARN-512. Log aggregation root directory check is more expensive than it

View File

@ -111,14 +111,15 @@ public class FileSystemRMStateStore extends RMStateStore {
private void loadRMAppState(RMState rmState) throws Exception { private void loadRMAppState(RMState rmState) throws Exception {
try { try {
FileStatus[] childNodes = fs.listStatus(rmAppRoot);
List<ApplicationAttemptState> attempts = List<ApplicationAttemptState> attempts =
new ArrayList<ApplicationAttemptState>(); new ArrayList<ApplicationAttemptState>();
for(FileStatus childNodeStatus : childNodes) {
for (FileStatus appDir : fs.listStatus(rmAppRoot)) {
for (FileStatus childNodeStatus : fs.listStatus(appDir.getPath())) {
assert childNodeStatus.isFile(); assert childNodeStatus.isFile();
String childNodeName = childNodeStatus.getPath().getName(); String childNodeName = childNodeStatus.getPath().getName();
Path childNodePath = getNodePath(rmAppRoot, childNodeName); byte[] childData =
byte[] childData = readFile(childNodePath, childNodeStatus.getLen()); readFile(childNodeStatus.getPath(), childNodeStatus.getLen());
if (childNodeName.startsWith(ApplicationId.appIdStrPrefix)) { if (childNodeName.startsWith(ApplicationId.appIdStrPrefix)) {
// application // application
LOG.info("Loading application from node: " + childNodeName); LOG.info("Loading application from node: " + childNodeName);
@ -126,15 +127,15 @@ public class FileSystemRMStateStore extends RMStateStore {
ApplicationStateDataPBImpl appStateData = ApplicationStateDataPBImpl appStateData =
new ApplicationStateDataPBImpl( new ApplicationStateDataPBImpl(
ApplicationStateDataProto.parseFrom(childData)); ApplicationStateDataProto.parseFrom(childData));
ApplicationState appState = new ApplicationState( ApplicationState appState =
appStateData.getSubmitTime(), new ApplicationState(appStateData.getSubmitTime(),
appStateData.getApplicationSubmissionContext(), appStateData.getApplicationSubmissionContext(),
appStateData.getUser()); appStateData.getUser());
// assert child node name is same as actual applicationId // assert child node name is same as actual applicationId
assert appId.equals(appState.context.getApplicationId()); assert appId.equals(appState.context.getApplicationId());
rmState.appState.put(appId, appState); rmState.appState.put(appId, appState);
} else if(childNodeName.startsWith( } else if (childNodeName
ApplicationAttemptId.appAttemptIdStrPrefix)) { .startsWith(ApplicationAttemptId.appAttemptIdStrPrefix)) {
// attempt // attempt
LOG.info("Loading application attempt from node: " + childNodeName); LOG.info("Loading application attempt from node: " + childNodeName);
ApplicationAttemptId attemptId = ApplicationAttemptId attemptId =
@ -160,21 +161,16 @@ public class FileSystemRMStateStore extends RMStateStore {
LOG.info("Unknown child node with name: " + childNodeName); LOG.info("Unknown child node with name: " + childNodeName);
} }
} }
}
// go through all attempts and add them to their apps // go through all attempts and add them to their apps, Ideally, each
// attempt node must have a corresponding app node, because remove
// directory operation remove both at the same time
for (ApplicationAttemptState attemptState : attempts) { for (ApplicationAttemptState attemptState : attempts) {
ApplicationId appId = attemptState.getAttemptId().getApplicationId(); ApplicationId appId = attemptState.getAttemptId().getApplicationId();
ApplicationState appState = rmState.appState.get(appId); ApplicationState appState = rmState.appState.get(appId);
if(appState != null) { assert appState != null;
appState.attempts.put(attemptState.getAttemptId(), attemptState); appState.attempts.put(attemptState.getAttemptId(), attemptState);
} else {
// the application node may have been removed when the application
// completed but the RM might have stopped before it could remove the
// application attempt nodes
LOG.info("Application node not found for attempt: "
+ attemptState.getAttemptId());
deleteFile(getNodePath(rmAppRoot, attemptState.getAttemptId().toString()));
}
} }
} catch (Exception e) { } catch (Exception e) {
LOG.error("Failed to load state.", e); LOG.error("Failed to load state.", e);
@ -188,6 +184,12 @@ public class FileSystemRMStateStore extends RMStateStore {
for(FileStatus childNodeStatus : childNodes) { for(FileStatus childNodeStatus : childNodes) {
assert childNodeStatus.isFile(); assert childNodeStatus.isFile();
String childNodeName = childNodeStatus.getPath().getName(); String childNodeName = childNodeStatus.getPath().getName();
if(childNodeName.startsWith(DELEGATION_TOKEN_SEQUENCE_NUMBER_PREFIX)) {
rmState.rmSecretManagerState.dtSequenceNumber =
Integer.parseInt(childNodeName.split("_")[1]);
continue;
}
Path childNodePath = getNodePath(rmDTSecretManagerRoot, childNodeName); Path childNodePath = getNodePath(rmDTSecretManagerRoot, childNodeName);
byte[] childData = readFile(childNodePath, childNodeStatus.getLen()); byte[] childData = readFile(childNodePath, childNodeStatus.getLen());
ByteArrayInputStream is = new ByteArrayInputStream(childData); ByteArrayInputStream is = new ByteArrayInputStream(childData);
@ -202,9 +204,6 @@ public class FileSystemRMStateStore extends RMStateStore {
long renewDate = fsIn.readLong(); long renewDate = fsIn.readLong();
rmState.rmSecretManagerState.delegationTokenState.put(identifier, rmState.rmSecretManagerState.delegationTokenState.put(identifier,
renewDate); renewDate);
} else if(childNodeName.startsWith(DELEGATION_TOKEN_SEQUENCE_NUMBER_PREFIX)) {
rmState.rmSecretManagerState.dtSequenceNumber =
Integer.parseInt(childNodeName.split("_")[1]);
} else { } else {
LOG.warn("Unknown file for recovering RMDelegationTokenSecretManager"); LOG.warn("Unknown file for recovering RMDelegationTokenSecretManager");
} }
@ -215,7 +214,9 @@ public class FileSystemRMStateStore extends RMStateStore {
@Override @Override
public synchronized void storeApplicationState(String appId, public synchronized void storeApplicationState(String appId,
ApplicationStateDataPBImpl appStateDataPB) throws Exception { ApplicationStateDataPBImpl appStateDataPB) throws Exception {
Path nodeCreatePath = getNodePath(rmAppRoot, appId); Path appDirPath = getAppDir(rmAppRoot, appId);
fs.mkdirs(appDirPath);
Path nodeCreatePath = getNodePath(appDirPath, appId);
LOG.info("Storing info for app: " + appId + " at: " + nodeCreatePath); LOG.info("Storing info for app: " + appId + " at: " + nodeCreatePath);
byte[] appStateData = appStateDataPB.getProto().toByteArray(); byte[] appStateData = appStateDataPB.getProto().toByteArray();
@ -232,7 +233,11 @@ public class FileSystemRMStateStore extends RMStateStore {
@Override @Override
public synchronized void storeApplicationAttemptState(String attemptId, public synchronized void storeApplicationAttemptState(String attemptId,
ApplicationAttemptStateDataPBImpl attemptStateDataPB) throws Exception { ApplicationAttemptStateDataPBImpl attemptStateDataPB) throws Exception {
Path nodeCreatePath = getNodePath(rmAppRoot, attemptId); ApplicationAttemptId appAttemptId =
ConverterUtils.toApplicationAttemptId(attemptId);
Path appDirPath =
getAppDir(rmAppRoot, appAttemptId.getApplicationId().toString());
Path nodeCreatePath = getNodePath(appDirPath, attemptId);
LOG.info("Storing info for attempt: " + attemptId LOG.info("Storing info for attempt: " + attemptId
+ " at: " + nodeCreatePath); + " at: " + nodeCreatePath);
byte[] attemptStateData = attemptStateDataPB.getProto().toByteArray(); byte[] attemptStateData = attemptStateDataPB.getProto().toByteArray();
@ -250,20 +255,9 @@ public class FileSystemRMStateStore extends RMStateStore {
public synchronized void removeApplicationState(ApplicationState appState) public synchronized void removeApplicationState(ApplicationState appState)
throws Exception { throws Exception {
String appId = appState.getAppId().toString(); String appId = appState.getAppId().toString();
Path nodeRemovePath = getNodePath(rmAppRoot, appId); Path nodeRemovePath = getAppDir(rmAppRoot, appId);
LOG.info("Removing info for app: " + appId + " at: " + nodeRemovePath); LOG.info("Removing info for app: " + appId + " at: " + nodeRemovePath);
deleteFile(nodeRemovePath); deleteFile(nodeRemovePath);
for(ApplicationAttemptId attemptId : appState.attempts.keySet()) {
removeApplicationAttemptState(attemptId.toString());
}
}
public synchronized void removeApplicationAttemptState(String attemptId)
throws Exception {
Path nodeRemovePath = getNodePath(rmAppRoot, attemptId);
LOG.info("Removing info for attempt: " + attemptId
+ " at: " + nodeRemovePath);
deleteFile(nodeRemovePath);
} }
@Override @Override
@ -329,6 +323,10 @@ public class FileSystemRMStateStore extends RMStateStore {
deleteFile(nodeCreatePath); deleteFile(nodeCreatePath);
} }
private Path getAppDir(Path root, String appId) {
return getNodePath(root, appId);
}
// FileSystem related code // FileSystem related code
private void deleteFile(Path deletePath) throws Exception { private void deleteFile(Path deletePath) throws Exception {

View File

@ -105,8 +105,6 @@ public class TestRMStateStore {
interface RMStateStoreHelper { interface RMStateStoreHelper {
RMStateStore getRMStateStore() throws Exception; RMStateStore getRMStateStore() throws Exception;
void addOrphanAttemptIfNeeded(RMStateStore testStore,
TestDispatcher dispatcher) throws Exception;
boolean isFinalStateValid() throws Exception; boolean isFinalStateValid() throws Exception;
} }
@ -153,15 +151,6 @@ public class TestRMStateStore {
return store; return store;
} }
@Override
public void addOrphanAttemptIfNeeded(RMStateStore testStore,
TestDispatcher dispatcher) throws Exception {
ApplicationAttemptId attemptId = ConverterUtils.toApplicationAttemptId(
"appattempt_1352994193343_0003_000001");
storeAttempt(testStore, attemptId,
"container_1352994193343_0003_01_000001", null, null, dispatcher);
}
@Override @Override
public boolean isFinalStateValid() throws Exception { public boolean isFinalStateValid() throws Exception {
FileSystem fs = cluster.getFileSystem(); FileSystem fs = cluster.getFileSystem();
@ -289,9 +278,6 @@ public class TestRMStateStore {
attempts.put(attemptIdRemoved, mockRemovedAttempt); attempts.put(attemptIdRemoved, mockRemovedAttempt);
store.removeApplication(mockRemovedApp); store.removeApplication(mockRemovedApp);
// add orphan attempt file to simulate incomplete removal of app state
stateStoreHelper.addOrphanAttemptIfNeeded(store, dispatcher);
// let things settle down // let things settle down
Thread.sleep(1000); Thread.sleep(1000);
store.close(); store.close();
@ -301,9 +287,6 @@ public class TestRMStateStore {
RMState state = store.loadState(); RMState state = store.loadState();
Map<ApplicationId, ApplicationState> rmAppState = state.getApplicationState(); Map<ApplicationId, ApplicationState> rmAppState = state.getApplicationState();
// removed app or orphan attempt is not loaded
assertEquals(1, rmAppState.size());
ApplicationState appState = rmAppState.get(appId1); ApplicationState appState = rmAppState.get(appId1);
// app is loaded // app is loaded
assertNotNull(appState); assertNotNull(appState);