diff --git a/hadoop-yarn-project/CHANGES.txt b/hadoop-yarn-project/CHANGES.txt index 3898bbc84ec..ee50cfecc77 100644 --- a/hadoop-yarn-project/CHANGES.txt +++ b/hadoop-yarn-project/CHANGES.txt @@ -137,6 +137,9 @@ Release 2.7.0 - UNRELEASED YARN-2880. Added a test to make sure node labels will be recovered if RM restart is enabled. (Rohith Sharmaks via jianhe) + YARN-2996. Improved synchronization and I/O operations of FS- and Mem- + RMStateStore. (Yi Liu via zjshen) + OPTIMIZATIONS BUG FIXES diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/FileSystemRMStateStore.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/FileSystemRMStateStore.java index 77836620b29..6e830a028ab 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/FileSystemRMStateStore.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/FileSystemRMStateStore.java @@ -139,8 +139,8 @@ public class FileSystemRMStateStore extends RMStateStore { @Override protected synchronized Version loadVersion() throws Exception { Path versionNodePath = getNodePath(rootDirPath, VERSION_NODE); - if (fs.exists(versionNodePath)) { - FileStatus status = fs.getFileStatus(versionNodePath); + FileStatus status = getFileStatus(versionNodePath); + if (status != null) { byte[] data = readFile(versionNodePath, status.getLen()); Version version = new VersionPBImpl(VersionProto.parseFrom(data)); @@ -165,9 +165,9 @@ public class FileSystemRMStateStore extends RMStateStore { public synchronized long getAndIncrementEpoch() throws Exception { Path epochNodePath = getNodePath(rootDirPath, EPOCH_NODE); long currentEpoch = 0; - if (fs.exists(epochNodePath)) { + FileStatus status = getFileStatus(epochNodePath); + if (status != null) { // load current epoch - FileStatus status = fs.getFileStatus(epochNodePath); byte[] data = readFile(epochNodePath, status.getLen()); Epoch epoch = new EpochPBImpl(EpochProto.parseFrom(data)); currentEpoch = epoch.getEpoch(); @@ -201,13 +201,11 @@ public class FileSystemRMStateStore extends RMStateStore { checkAndResumeUpdateOperation(amrmTokenSecretManagerRoot); Path amrmTokenSecretManagerStateDataDir = new Path(amrmTokenSecretManagerRoot, AMRMTOKEN_SECRET_MANAGER_NODE); - FileStatus status; - try { - status = fs.getFileStatus(amrmTokenSecretManagerStateDataDir); - assert status.isFile(); - } catch (FileNotFoundException ex) { + FileStatus status = getFileStatus(amrmTokenSecretManagerStateDataDir); + if (status == null) { return; } + assert status.isFile(); byte[] data = readFile(amrmTokenSecretManagerStateDataDir, status.getLen()); AMRMTokenSecretManagerStatePBImpl stateData = new AMRMTokenSecretManagerStatePBImpl( @@ -466,7 +464,7 @@ public class FileSystemRMStateStore extends RMStateStore { } @Override - protected void updateRMDelegationTokenState( + protected synchronized void updateRMDelegationTokenState( RMDelegationTokenIdentifier rmDTIdentifier, Long renewDate) throws Exception { storeOrUpdateRMDelegationTokenState(rmDTIdentifier, renewDate, true); @@ -560,6 +558,14 @@ public class FileSystemRMStateStore extends RMStateStore { } } + private FileStatus getFileStatus(Path path) throws Exception { + try { + return fs.getFileStatus(path); + } catch (FileNotFoundException e) { + return null; + } + } + /* * In order to make this write atomic as a part of write we will first write * data to .tmp file and then rename it. Here we are assuming that rename is diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/MemoryRMStateStore.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/MemoryRMStateStore.java index 3646949b604..8cd776e6044 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/MemoryRMStateStore.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/MemoryRMStateStore.java @@ -91,15 +91,16 @@ public class MemoryRMStateStore extends RMStateStore { } @Override - public void storeApplicationStateInternal( + public synchronized void storeApplicationStateInternal( ApplicationId appId, ApplicationStateData appState) throws Exception { state.appState.put(appId, appState); } @Override - public void updateApplicationStateInternal(ApplicationId appId, - ApplicationStateData appState) throws Exception { + public synchronized void updateApplicationStateInternal( + ApplicationId appId, ApplicationStateData appState) + throws Exception { LOG.info("Updating final state " + appState.getState() + " for app: " + appId); if (state.appState.get(appId) != null) { @@ -186,7 +187,7 @@ public class MemoryRMStateStore extends RMStateStore { } @Override - protected void updateRMDelegationTokenState( + protected synchronized void updateRMDelegationTokenState( RMDelegationTokenIdentifier rmDTIdentifier, Long renewDate) throws Exception { removeRMDelegationTokenState(rmDTIdentifier); @@ -237,7 +238,7 @@ public class MemoryRMStateStore extends RMStateStore { } @Override - public void storeOrUpdateAMRMTokenSecretManagerState( + public synchronized void storeOrUpdateAMRMTokenSecretManagerState( AMRMTokenSecretManagerState amrmTokenSecretManagerState, boolean isUpdate) { if (amrmTokenSecretManagerState != null) {