YARN-2730. DefaultContainerExecutor runs only one localizer at a time. Contributed by Siqi Li
This commit is contained in:
parent
67f13b58e4
commit
6157ace547
|
@ -839,6 +839,9 @@ Release 2.6.0 - UNRELEASED
|
||||||
YARN-2785. Fixed intermittent TestContainerResourceUsage failure. (Varun Vasudev
|
YARN-2785. Fixed intermittent TestContainerResourceUsage failure. (Varun Vasudev
|
||||||
via zjshen)
|
via zjshen)
|
||||||
|
|
||||||
|
YARN-2730. DefaultContainerExecutor runs only one localizer at a time
|
||||||
|
(Siqi Li via jlowe)
|
||||||
|
|
||||||
Release 2.5.1 - 2014-09-05
|
Release 2.5.1 - 2014-09-05
|
||||||
|
|
||||||
INCOMPATIBLE CHANGES
|
INCOMPATIBLE CHANGES
|
||||||
|
|
|
@ -94,7 +94,7 @@ public class DefaultContainerExecutor extends ContainerExecutor {
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public synchronized void startLocalizer(Path nmPrivateContainerTokensPath,
|
public void startLocalizer(Path nmPrivateContainerTokensPath,
|
||||||
InetSocketAddress nmAddr, String user, String appId, String locId,
|
InetSocketAddress nmAddr, String user, String appId, String locId,
|
||||||
LocalDirsHandlerService dirsHandler)
|
LocalDirsHandlerService dirsHandler)
|
||||||
throws IOException, InterruptedException {
|
throws IOException, InterruptedException {
|
||||||
|
@ -102,10 +102,6 @@ public class DefaultContainerExecutor extends ContainerExecutor {
|
||||||
List<String> localDirs = dirsHandler.getLocalDirs();
|
List<String> localDirs = dirsHandler.getLocalDirs();
|
||||||
List<String> logDirs = dirsHandler.getLogDirs();
|
List<String> logDirs = dirsHandler.getLogDirs();
|
||||||
|
|
||||||
ContainerLocalizer localizer =
|
|
||||||
new ContainerLocalizer(lfs, user, appId, locId, getPaths(localDirs),
|
|
||||||
RecordFactoryProvider.getRecordFactory(getConf()));
|
|
||||||
|
|
||||||
createUserLocalDirs(localDirs, user);
|
createUserLocalDirs(localDirs, user);
|
||||||
createUserCacheDirs(localDirs, user);
|
createUserCacheDirs(localDirs, user);
|
||||||
createAppDirs(localDirs, user, appId);
|
createAppDirs(localDirs, user, appId);
|
||||||
|
@ -118,8 +114,17 @@ public class DefaultContainerExecutor extends ContainerExecutor {
|
||||||
Path tokenDst = new Path(appStorageDir, tokenFn);
|
Path tokenDst = new Path(appStorageDir, tokenFn);
|
||||||
copyFile(nmPrivateContainerTokensPath, tokenDst, user);
|
copyFile(nmPrivateContainerTokensPath, tokenDst, user);
|
||||||
LOG.info("Copying from " + nmPrivateContainerTokensPath + " to " + tokenDst);
|
LOG.info("Copying from " + nmPrivateContainerTokensPath + " to " + tokenDst);
|
||||||
lfs.setWorkingDirectory(appStorageDir);
|
|
||||||
LOG.info("CWD set to " + appStorageDir + " = " + lfs.getWorkingDirectory());
|
|
||||||
|
FileContext localizerFc = FileContext.getFileContext(
|
||||||
|
lfs.getDefaultFileSystem(), getConf());
|
||||||
|
localizerFc.setUMask(lfs.getUMask());
|
||||||
|
localizerFc.setWorkingDirectory(appStorageDir);
|
||||||
|
LOG.info("Localizer CWD set to " + appStorageDir + " = "
|
||||||
|
+ localizerFc.getWorkingDirectory());
|
||||||
|
ContainerLocalizer localizer =
|
||||||
|
new ContainerLocalizer(localizerFc, user, appId, locId,
|
||||||
|
getPaths(localDirs), RecordFactoryProvider.getRecordFactory(getConf()));
|
||||||
// TODO: DO it over RPC for maintaining similarity?
|
// TODO: DO it over RPC for maintaining similarity?
|
||||||
localizer.runLocalization(nmAddr);
|
localizer.runLocalization(nmAddr);
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue