From 9f3e488936b6fbfc30e4b0cc42667288355345d1 Mon Sep 17 00:00:00 2001 From: Bikas Saha Date: Wed, 17 Jul 2013 20:19:49 +0000 Subject: [PATCH] YARN-922. Change FileSystemRMStateStore to use directories (Jian He via bikas) git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1504261 13f79535-47bb-0310-9956-ffa450edef68 --- hadoop-yarn-project/CHANGES.txt | 3 + .../recovery/FileSystemRMStateStore.java | 148 +++++++++--------- .../recovery/TestRMStateStore.java | 17 -- 3 files changed, 76 insertions(+), 92 deletions(-) diff --git a/hadoop-yarn-project/CHANGES.txt b/hadoop-yarn-project/CHANGES.txt index 5f9ddd0d1ef..014456659ad 100644 --- a/hadoop-yarn-project/CHANGES.txt +++ b/hadoop-yarn-project/CHANGES.txt @@ -500,6 +500,9 @@ Release 2.1.0-beta - 2013-07-02 YARN-927. Change ContainerRequest to not have more than 1 container count and remove StoreContainerRequest (bikas) + YARN-922. Change FileSystemRMStateStore to use directories (Jian He via + bikas) + OPTIMIZATIONS YARN-512. Log aggregation root directory check is more expensive than it diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/FileSystemRMStateStore.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/FileSystemRMStateStore.java index 8da33737351..a5262b001fd 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/FileSystemRMStateStore.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/FileSystemRMStateStore.java @@ -111,70 +111,66 @@ public class FileSystemRMStateStore extends RMStateStore { private void loadRMAppState(RMState rmState) throws Exception { try { - FileStatus[] childNodes = fs.listStatus(rmAppRoot); List attempts = - new ArrayList(); - for(FileStatus childNodeStatus : childNodes) { - assert childNodeStatus.isFile(); - String childNodeName = childNodeStatus.getPath().getName(); - Path childNodePath = getNodePath(rmAppRoot, childNodeName); - byte[] childData = readFile(childNodePath, childNodeStatus.getLen()); - if(childNodeName.startsWith(ApplicationId.appIdStrPrefix)){ - // application - LOG.info("Loading application from node: " + childNodeName); - ApplicationId appId = ConverterUtils.toApplicationId(childNodeName); - ApplicationStateDataPBImpl appStateData = - new ApplicationStateDataPBImpl( - ApplicationStateDataProto.parseFrom(childData)); - ApplicationState appState = new ApplicationState( - appStateData.getSubmitTime(), - appStateData.getApplicationSubmissionContext(), - appStateData.getUser()); - // assert child node name is same as actual applicationId - assert appId.equals(appState.context.getApplicationId()); - rmState.appState.put(appId, appState); - } else if(childNodeName.startsWith( - ApplicationAttemptId.appAttemptIdStrPrefix)) { - // attempt - LOG.info("Loading application attempt from node: " + childNodeName); - ApplicationAttemptId attemptId = - ConverterUtils.toApplicationAttemptId(childNodeName); - ApplicationAttemptStateDataPBImpl attemptStateData = - new ApplicationAttemptStateDataPBImpl( - ApplicationAttemptStateDataProto.parseFrom(childData)); - Credentials credentials = null; - if(attemptStateData.getAppAttemptTokens() != null){ - credentials = new Credentials(); - DataInputByteBuffer dibb = new DataInputByteBuffer(); - dibb.reset(attemptStateData.getAppAttemptTokens()); - credentials.readTokenStorageStream(dibb); - } - ApplicationAttemptState attemptState = - new ApplicationAttemptState(attemptId, - attemptStateData.getMasterContainer(), credentials); + new ArrayList(); - // assert child node name is same as application attempt id - assert attemptId.equals(attemptState.getAttemptId()); - attempts.add(attemptState); - } else { - LOG.info("Unknown child node with name: " + childNodeName); + for (FileStatus appDir : fs.listStatus(rmAppRoot)) { + for (FileStatus childNodeStatus : fs.listStatus(appDir.getPath())) { + assert childNodeStatus.isFile(); + String childNodeName = childNodeStatus.getPath().getName(); + byte[] childData = + readFile(childNodeStatus.getPath(), childNodeStatus.getLen()); + if (childNodeName.startsWith(ApplicationId.appIdStrPrefix)) { + // application + LOG.info("Loading application from node: " + childNodeName); + ApplicationId appId = ConverterUtils.toApplicationId(childNodeName); + ApplicationStateDataPBImpl appStateData = + new ApplicationStateDataPBImpl( + ApplicationStateDataProto.parseFrom(childData)); + ApplicationState appState = + new ApplicationState(appStateData.getSubmitTime(), + appStateData.getApplicationSubmissionContext(), + appStateData.getUser()); + // assert child node name is same as actual applicationId + assert appId.equals(appState.context.getApplicationId()); + rmState.appState.put(appId, appState); + } else if (childNodeName + .startsWith(ApplicationAttemptId.appAttemptIdStrPrefix)) { + // attempt + LOG.info("Loading application attempt from node: " + childNodeName); + ApplicationAttemptId attemptId = + ConverterUtils.toApplicationAttemptId(childNodeName); + ApplicationAttemptStateDataPBImpl attemptStateData = + new ApplicationAttemptStateDataPBImpl( + ApplicationAttemptStateDataProto.parseFrom(childData)); + Credentials credentials = null; + if (attemptStateData.getAppAttemptTokens() != null) { + credentials = new Credentials(); + DataInputByteBuffer dibb = new DataInputByteBuffer(); + dibb.reset(attemptStateData.getAppAttemptTokens()); + credentials.readTokenStorageStream(dibb); + } + ApplicationAttemptState attemptState = + new ApplicationAttemptState(attemptId, + attemptStateData.getMasterContainer(), credentials); + + // assert child node name is same as application attempt id + assert attemptId.equals(attemptState.getAttemptId()); + attempts.add(attemptState); + } else { + LOG.info("Unknown child node with name: " + childNodeName); + } } } - // go through all attempts and add them to their apps - for(ApplicationAttemptState attemptState : attempts) { + // go through all attempts and add them to their apps, Ideally, each + // attempt node must have a corresponding app node, because remove + // directory operation remove both at the same time + for (ApplicationAttemptState attemptState : attempts) { ApplicationId appId = attemptState.getAttemptId().getApplicationId(); ApplicationState appState = rmState.appState.get(appId); - if(appState != null) { - appState.attempts.put(attemptState.getAttemptId(), attemptState); - } else { - // the application node may have been removed when the application - // completed but the RM might have stopped before it could remove the - // application attempt nodes - LOG.info("Application node not found for attempt: " - + attemptState.getAttemptId()); - deleteFile(getNodePath(rmAppRoot, attemptState.getAttemptId().toString())); - } + assert appState != null; + appState.attempts.put(attemptState.getAttemptId(), attemptState); } } catch (Exception e) { LOG.error("Failed to load state.", e); @@ -188,6 +184,12 @@ public class FileSystemRMStateStore extends RMStateStore { for(FileStatus childNodeStatus : childNodes) { assert childNodeStatus.isFile(); String childNodeName = childNodeStatus.getPath().getName(); + if(childNodeName.startsWith(DELEGATION_TOKEN_SEQUENCE_NUMBER_PREFIX)) { + rmState.rmSecretManagerState.dtSequenceNumber = + Integer.parseInt(childNodeName.split("_")[1]); + continue; + } + Path childNodePath = getNodePath(rmDTSecretManagerRoot, childNodeName); byte[] childData = readFile(childNodePath, childNodeStatus.getLen()); ByteArrayInputStream is = new ByteArrayInputStream(childData); @@ -202,10 +204,7 @@ public class FileSystemRMStateStore extends RMStateStore { long renewDate = fsIn.readLong(); rmState.rmSecretManagerState.delegationTokenState.put(identifier, renewDate); - } else if(childNodeName.startsWith(DELEGATION_TOKEN_SEQUENCE_NUMBER_PREFIX)) { - rmState.rmSecretManagerState.dtSequenceNumber = - Integer.parseInt(childNodeName.split("_")[1]); - }else { + } else { LOG.warn("Unknown file for recovering RMDelegationTokenSecretManager"); } fsIn.close(); @@ -215,7 +214,9 @@ public class FileSystemRMStateStore extends RMStateStore { @Override public synchronized void storeApplicationState(String appId, ApplicationStateDataPBImpl appStateDataPB) throws Exception { - Path nodeCreatePath = getNodePath(rmAppRoot, appId); + Path appDirPath = getAppDir(rmAppRoot, appId); + fs.mkdirs(appDirPath); + Path nodeCreatePath = getNodePath(appDirPath, appId); LOG.info("Storing info for app: " + appId + " at: " + nodeCreatePath); byte[] appStateData = appStateDataPB.getProto().toByteArray(); @@ -232,7 +233,11 @@ public class FileSystemRMStateStore extends RMStateStore { @Override public synchronized void storeApplicationAttemptState(String attemptId, ApplicationAttemptStateDataPBImpl attemptStateDataPB) throws Exception { - Path nodeCreatePath = getNodePath(rmAppRoot, attemptId); + ApplicationAttemptId appAttemptId = + ConverterUtils.toApplicationAttemptId(attemptId); + Path appDirPath = + getAppDir(rmAppRoot, appAttemptId.getApplicationId().toString()); + Path nodeCreatePath = getNodePath(appDirPath, attemptId); LOG.info("Storing info for attempt: " + attemptId + " at: " + nodeCreatePath); byte[] attemptStateData = attemptStateDataPB.getProto().toByteArray(); @@ -250,20 +255,9 @@ public class FileSystemRMStateStore extends RMStateStore { public synchronized void removeApplicationState(ApplicationState appState) throws Exception { String appId = appState.getAppId().toString(); - Path nodeRemovePath = getNodePath(rmAppRoot, appId); + Path nodeRemovePath = getAppDir(rmAppRoot, appId); LOG.info("Removing info for app: " + appId + " at: " + nodeRemovePath); deleteFile(nodeRemovePath); - for(ApplicationAttemptId attemptId : appState.attempts.keySet()) { - removeApplicationAttemptState(attemptId.toString()); - } - } - - public synchronized void removeApplicationAttemptState(String attemptId) - throws Exception { - Path nodeRemovePath = getNodePath(rmAppRoot, attemptId); - LOG.info("Removing info for attempt: " + attemptId - + " at: " + nodeRemovePath); - deleteFile(nodeRemovePath); } @Override @@ -329,6 +323,10 @@ public class FileSystemRMStateStore extends RMStateStore { deleteFile(nodeCreatePath); } + private Path getAppDir(Path root, String appId) { + return getNodePath(root, appId); + } + // FileSystem related code private void deleteFile(Path deletePath) throws Exception { diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/TestRMStateStore.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/TestRMStateStore.java index ef54c233ff0..a24af257bd4 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/TestRMStateStore.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/TestRMStateStore.java @@ -105,8 +105,6 @@ public class TestRMStateStore { interface RMStateStoreHelper { RMStateStore getRMStateStore() throws Exception; - void addOrphanAttemptIfNeeded(RMStateStore testStore, - TestDispatcher dispatcher) throws Exception; boolean isFinalStateValid() throws Exception; } @@ -153,15 +151,6 @@ public class TestRMStateStore { return store; } - @Override - public void addOrphanAttemptIfNeeded(RMStateStore testStore, - TestDispatcher dispatcher) throws Exception { - ApplicationAttemptId attemptId = ConverterUtils.toApplicationAttemptId( - "appattempt_1352994193343_0003_000001"); - storeAttempt(testStore, attemptId, - "container_1352994193343_0003_01_000001", null, null, dispatcher); - } - @Override public boolean isFinalStateValid() throws Exception { FileSystem fs = cluster.getFileSystem(); @@ -289,9 +278,6 @@ public class TestRMStateStore { attempts.put(attemptIdRemoved, mockRemovedAttempt); store.removeApplication(mockRemovedApp); - // add orphan attempt file to simulate incomplete removal of app state - stateStoreHelper.addOrphanAttemptIfNeeded(store, dispatcher); - // let things settle down Thread.sleep(1000); store.close(); @@ -301,9 +287,6 @@ public class TestRMStateStore { RMState state = store.loadState(); Map rmAppState = state.getApplicationState(); - // removed app or orphan attempt is not loaded - assertEquals(1, rmAppState.size()); - ApplicationState appState = rmAppState.get(appId1); // app is loaded assertNotNull(appState);