YARN-2730. DefaultContainerExecutor runs only one localizer at a time. Contributed by Siqi Li

(cherry picked from commit 6157ace547)
This commit is contained in:
Jason Lowe 2014-11-03 20:37:47 +00:00
parent 3e41828639
commit 2098c68acb
2 changed files with 15 additions and 7 deletions

View File

@ -764,6 +764,9 @@ Release 2.6.0 - UNRELEASED
YARN-2785. Fixed intermittent TestContainerResourceUsage failure. (Varun Vasudev YARN-2785. Fixed intermittent TestContainerResourceUsage failure. (Varun Vasudev
via zjshen) via zjshen)
YARN-2730. DefaultContainerExecutor runs only one localizer at a time
(Siqi Li via jlowe)
Release 2.5.1 - 2014-09-05 Release 2.5.1 - 2014-09-05
INCOMPATIBLE CHANGES INCOMPATIBLE CHANGES

View File

@ -94,7 +94,7 @@ public class DefaultContainerExecutor extends ContainerExecutor {
} }
@Override @Override
public synchronized void startLocalizer(Path nmPrivateContainerTokensPath, public void startLocalizer(Path nmPrivateContainerTokensPath,
InetSocketAddress nmAddr, String user, String appId, String locId, InetSocketAddress nmAddr, String user, String appId, String locId,
LocalDirsHandlerService dirsHandler) LocalDirsHandlerService dirsHandler)
throws IOException, InterruptedException { throws IOException, InterruptedException {
@ -102,10 +102,6 @@ public class DefaultContainerExecutor extends ContainerExecutor {
List<String> localDirs = dirsHandler.getLocalDirs(); List<String> localDirs = dirsHandler.getLocalDirs();
List<String> logDirs = dirsHandler.getLogDirs(); List<String> logDirs = dirsHandler.getLogDirs();
ContainerLocalizer localizer =
new ContainerLocalizer(lfs, user, appId, locId, getPaths(localDirs),
RecordFactoryProvider.getRecordFactory(getConf()));
createUserLocalDirs(localDirs, user); createUserLocalDirs(localDirs, user);
createUserCacheDirs(localDirs, user); createUserCacheDirs(localDirs, user);
createAppDirs(localDirs, user, appId); createAppDirs(localDirs, user, appId);
@ -118,8 +114,17 @@ public class DefaultContainerExecutor extends ContainerExecutor {
Path tokenDst = new Path(appStorageDir, tokenFn); Path tokenDst = new Path(appStorageDir, tokenFn);
copyFile(nmPrivateContainerTokensPath, tokenDst, user); copyFile(nmPrivateContainerTokensPath, tokenDst, user);
LOG.info("Copying from " + nmPrivateContainerTokensPath + " to " + tokenDst); LOG.info("Copying from " + nmPrivateContainerTokensPath + " to " + tokenDst);
lfs.setWorkingDirectory(appStorageDir);
LOG.info("CWD set to " + appStorageDir + " = " + lfs.getWorkingDirectory());
FileContext localizerFc = FileContext.getFileContext(
lfs.getDefaultFileSystem(), getConf());
localizerFc.setUMask(lfs.getUMask());
localizerFc.setWorkingDirectory(appStorageDir);
LOG.info("Localizer CWD set to " + appStorageDir + " = "
+ localizerFc.getWorkingDirectory());
ContainerLocalizer localizer =
new ContainerLocalizer(localizerFc, user, appId, locId,
getPaths(localDirs), RecordFactoryProvider.getRecordFactory(getConf()));
// TODO: DO it over RPC for maintaining similarity? // TODO: DO it over RPC for maintaining similarity?
localizer.runLocalization(nmAddr); localizer.runLocalization(nmAddr);
} }