MAPREDUCE-3121. NodeManager should handle disk-failures (Ravi Gummadi via mahadev) - Merging r1208131 from trunk.

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/branch-0.23@1208135 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Mahadev Konar 2011-11-29 23:28:16 +00:00
parent c4b13d74e5
commit de8f0efe60
53 changed files with 1473 additions and 516 deletions

View File

@ -6,6 +6,8 @@ Release 0.23.1 - Unreleased
NEW FEATURES
MAPREDUCE-3121. NodeManager should handle disk-failures (Ravi Gummadi via mahadev)
IMPROVEMENTS
MAPREDUCE-3375. [Gridmix] Memory Emulation system tests.
(Vinay Thota via amarrk)

View File

@ -1,3 +1,3 @@
yarn.nodemanager.local-dirs=#configured value of yarn.nodemanager.local-dirs. It can be a list of comma separated paths.
yarn.nodemanager.log-dirs=#configured value of yarn.nodemanager.log-dirs.
yarn.nodemanager.linux-container-executor.group=#configured value of yarn.nodemanager.linux-container-executor.group
banned.users=#comma separated list of users who can not run applications
min.user.id=1000#Prevent other super-users

View File

@ -113,9 +113,10 @@ public void setup(JobConf conf) throws IOException {
Map<LocalResource, Future<Path>> resourcesToPaths = Maps.newHashMap();
ExecutorService exec = Executors.newCachedThreadPool();
Path destPath = localDirAllocator.getLocalPathForWrite(".", conf);
for (LocalResource resource : localResources.values()) {
Callable<Path> download = new FSDownload(localFSFileContext, ugi, conf,
localDirAllocator, resource, new Random());
destPath, resource, new Random());
Future<Path> future = exec.submit(download);
resourcesToPaths.put(resource, future);
}

View File

@ -56,7 +56,7 @@ public MiniMRYarnCluster(String testName) {
}
public MiniMRYarnCluster(String testName, int noOfNMs) {
super(testName, noOfNMs);
super(testName, noOfNMs, 4, 4);
//TODO: add the history server
historyServerWrapper = new JobHistoryServerWrapper();
addService(historyServerWrapper);

View File

@ -43,7 +43,8 @@ public class TestDistributedShell {
public static void setup() throws InterruptedException, IOException {
LOG.info("Starting up YARN cluster");
if (yarnCluster == null) {
yarnCluster = new MiniYARNCluster(TestDistributedShell.class.getName());
yarnCluster = new MiniYARNCluster(TestDistributedShell.class.getName(),
1, 1, 1);
yarnCluster.init(conf);
yarnCluster.start();
}

View File

@ -351,13 +351,39 @@ public class YarnConfiguration extends Configuration {
/** Class that calculates containers current resource utilization.*/
public static final String NM_CONTAINER_MON_RESOURCE_CALCULATOR =
NM_PREFIX + "container-monitor.resource-calculator.class";
/**
* Enable/Disable disks' health checker. Default is true.
* An expert level configuration property.
*/
public static final String NM_DISK_HEALTH_CHECK_ENABLE =
NM_PREFIX + "disk-health-checker.enable";
/** Frequency of running disks' health checker.*/
public static final String NM_DISK_HEALTH_CHECK_INTERVAL_MS =
NM_PREFIX + "disk-health-checker.interval-ms";
/** By default, disks' health is checked every 2 minutes. */
public static final long DEFAULT_NM_DISK_HEALTH_CHECK_INTERVAL_MS =
2 * 60 * 1000;
/**
* The minimum fraction of number of disks to be healthy for the nodemanager
* to launch new containers. This applies to nm-local-dirs and nm-log-dirs.
*/
public static final String NM_MIN_HEALTHY_DISKS_FRACTION =
NM_PREFIX + "disk-health-checker.min-healthy-disks";
/**
* By default, at least 5% of disks are to be healthy to say that the node
* is healthy in terms of disks.
*/
public static final float DEFAULT_NM_MIN_HEALTHY_DISKS_FRACTION
= 0.25F;
/** Frequency of running node health script.*/
public static final String NM_HEALTH_CHECK_INTERVAL_MS =
NM_PREFIX + "health-checker.interval-ms";
public static final long DEFAULT_NM_HEALTH_CHECK_INTERVAL_MS = 10 * 60 * 1000;
/** Script time out period.*/
/** Health check script time out period.*/
public static final String NM_HEALTH_CHECK_SCRIPT_TIMEOUT_MS =
NM_PREFIX + "health-checker.script.timeout-ms";
public static final long DEFAULT_NM_HEALTH_CHECK_SCRIPT_TIMEOUT_MS =

View File

@ -31,6 +31,7 @@
import java.security.PrivilegedExceptionAction;
import java.util.EnumSet;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
@ -105,12 +106,12 @@ public String toString() {
public static class LogValue {
private final String[] rootLogDirs;
private final List<String> rootLogDirs;
private final ContainerId containerId;
// TODO Maybe add a version string here. Instead of changing the version of
// the entire k-v format
public LogValue(String[] rootLogDirs, ContainerId containerId) {
public LogValue(List<String> rootLogDirs, ContainerId containerId) {
this.rootLogDirs = rootLogDirs;
this.containerId = containerId;
}

View File

@ -33,7 +33,6 @@
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.FileUtil;
import org.apache.hadoop.fs.LocalDirAllocator;
import org.apache.hadoop.fs.Options.Rename;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.security.UserGroupInformation;
@ -56,7 +55,10 @@ public class FSDownload implements Callable<Path> {
private final UserGroupInformation userUgi;
private Configuration conf;
private LocalResource resource;
private LocalDirAllocator dirs;
/** The local FS dir path under which this resource is to be localized to */
private Path destDirPath;
private static final FsPermission cachePerms = new FsPermission(
(short) 0755);
static final FsPermission PUBLIC_FILE_PERMS = new FsPermission((short) 0555);
@ -65,10 +67,11 @@ public class FSDownload implements Callable<Path> {
static final FsPermission PUBLIC_DIR_PERMS = new FsPermission((short) 0755);
static final FsPermission PRIVATE_DIR_PERMS = new FsPermission((short) 0700);
public FSDownload(FileContext files, UserGroupInformation ugi, Configuration conf,
LocalDirAllocator dirs, LocalResource resource, Random rand) {
Path destDirPath, LocalResource resource, Random rand) {
this.conf = conf;
this.dirs = dirs;
this.destDirPath = destDirPath;
this.files = files;
this.userUgi = ugi;
this.resource = resource;
@ -136,15 +139,13 @@ public Path call() throws Exception {
}
Path tmp;
Path dst =
dirs.getLocalPathForWrite(".", getEstimatedSize(resource),
conf);
do {
tmp = new Path(dst, String.valueOf(rand.nextLong()));
tmp = new Path(destDirPath, String.valueOf(rand.nextLong()));
} while (files.util().exists(tmp));
dst = tmp;
files.mkdir(dst, cachePerms, false);
final Path dst_work = new Path(dst + "_tmp");
destDirPath = tmp;
files.mkdir(destDirPath, cachePerms, false);
final Path dst_work = new Path(destDirPath + "_tmp");
files.mkdir(dst_work, cachePerms, false);
Path dFinal = files.makeQualified(new Path(dst_work, sCopy.getName()));
@ -158,9 +159,9 @@ public Path run() throws Exception {
});
unpack(new File(dTmp.toUri()), new File(dFinal.toUri()));
changePermissions(dFinal.getFileSystem(conf), dFinal);
files.rename(dst_work, dst, Rename.OVERWRITE);
files.rename(dst_work, destDirPath, Rename.OVERWRITE);
} catch (Exception e) {
try { files.delete(dst, true); } catch (IOException ignore) { }
try { files.delete(destDirPath, true); } catch (IOException ignore) { }
throw e;
} finally {
try {
@ -170,9 +171,8 @@ public Path run() throws Exception {
rand = null;
conf = null;
resource = null;
dirs = null;
}
return files.makeQualified(new Path(dst, sCopy.getName()));
return files.makeQualified(new Path(destDirPath, sCopy.getName()));
}
/**
@ -221,17 +221,4 @@ public Void run() throws Exception {
}
}
private static long getEstimatedSize(LocalResource rsrc) {
if (rsrc.getSize() < 0) {
return -1;
}
switch (rsrc.getType()) {
case ARCHIVE:
return 5 * rsrc.getSize();
case FILE:
default:
return rsrc.getSize();
}
}
}

View File

@ -146,13 +146,14 @@ public void testDownload() throws IOException, URISyntaxException,
vis = LocalResourceVisibility.APPLICATION;
break;
}
LocalResource rsrc = createFile(files, new Path(basedir, "" + i),
sizes[i], rand, vis);
Path p = new Path(basedir, "" + i);
LocalResource rsrc = createFile(files, p, sizes[i], rand, vis);
rsrcVis.put(rsrc, vis);
Path destPath = dirs.getLocalPathForWrite(
basedir.toString(), sizes[i], conf);
FSDownload fsd =
new FSDownload(files, UserGroupInformation.getCurrentUser(), conf,
dirs, rsrc, new Random(sharedSeed));
destPath, rsrc, new Random(sharedSeed));
pending.put(rsrc, exec.submit(fsd));
}
@ -249,13 +250,15 @@ public void testDirDownload() throws IOException, InterruptedException {
vis = LocalResourceVisibility.APPLICATION;
break;
}
LocalResource rsrc = createJar(files, new Path(basedir, "dir" + i
+ ".jar"), vis);
Path p = new Path(basedir, "dir" + i + ".jar");
LocalResource rsrc = createJar(files, p, vis);
rsrcVis.put(rsrc, vis);
Path destPath = dirs.getLocalPathForWrite(
basedir.toString(), conf);
FSDownload fsd =
new FSDownload(files, UserGroupInformation.getCurrentUser(), conf,
dirs, rsrc, new Random(sharedSeed));
destPath, rsrc, new Random(sharedSeed));
pending.put(rsrc, exec.submit(fsd));
}

View File

@ -388,6 +388,22 @@
<value></value>
</property>
<property>
<description>Frequency of running disk health checker code.</description>
<name>yarn.nodemanager.disk-health-checker.interval-ms</name>
<value>120000</value>
</property>
<property>
<description>The minimum fraction of number of disks to be healthy for the
nodemanager to launch new containers. This correspond to both
yarn-nodemanager.local-dirs and yarn.nodemanager.log-dirs. i.e. If there
are less number of healthy local-dirs (or log-dirs) available, then
new containers will not be launched on this node.</description>
<name>yarn.nodemanager.disk-health-checker.min-healthy-disks</name>
<value>0.25</value>
</property>
<property>
<description>The path to the Linux container executor.</description>
<name>yarn.nodemanager.linux-container-executor.path</name>

View File

@ -45,6 +45,7 @@ public abstract class ContainerExecutor implements Configurable {
FsPermission.createImmutable((short) 0700);
private Configuration conf;
private ConcurrentMap<ContainerId, Path> pidFiles =
new ConcurrentHashMap<ContainerId, Path>();
@ -68,7 +69,7 @@ public Configuration getConf() {
* @throws IOException
*/
public abstract void init() throws IOException;
/**
* Prepare the environment for containers in this application to execute.
* For $x in local.dirs
@ -82,12 +83,14 @@ public Configuration getConf() {
* @param appId id of the application
* @param nmPrivateContainerTokens path to localized credentials, rsrc by NM
* @param nmAddr RPC address to contact NM
* @param localDirs nm-local-dirs
* @param logDirs nm-log-dirs
* @throws IOException For most application init failures
* @throws InterruptedException If application init thread is halted by NM
*/
public abstract void startLocalizer(Path nmPrivateContainerTokens,
InetSocketAddress nmAddr, String user, String appId, String locId,
List<Path> localDirs)
List<String> localDirs, List<String> logDirs)
throws IOException, InterruptedException;
@ -100,12 +103,15 @@ public abstract void startLocalizer(Path nmPrivateContainerTokens,
* @param user the user of the container
* @param appId the appId of the container
* @param containerWorkDir the work dir for the container
* @param localDirs nm-local-dirs to be used for this container
* @param logDirs nm-log-dirs to be used for this container
* @return the return status of the launch
* @throws IOException
*/
public abstract int launchContainer(Container container,
Path nmPrivateContainerScriptPath, Path nmPrivateTokensPath,
String user, String appId, Path containerWorkDir) throws IOException;
String user, String appId, Path containerWorkDir, List<String> localDirs,
List<String> logDirs) throws IOException;
public abstract boolean signalContainer(String user, String pid,
Signal signal)
@ -116,7 +122,8 @@ public abstract void deleteAsUser(String user, Path subDir, Path... basedirs)
public enum ExitCode {
FORCE_KILLED(137),
TERMINATED(143);
TERMINATED(143),
DISKS_FAILED(-101);
private final int code;
private ExitCode(int exitCode) {

View File

@ -26,6 +26,7 @@
import java.io.IOException;
import java.io.PrintStream;
import java.net.InetSocketAddress;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.EnumSet;
import java.util.List;
@ -39,7 +40,6 @@
import org.apache.hadoop.util.Shell.ExitCodeException;
import org.apache.hadoop.util.Shell.ShellCommandExecutor;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerDiagnosticsUpdateEvent;
@ -77,16 +77,17 @@ public void init() throws IOException {
@Override
public void startLocalizer(Path nmPrivateContainerTokensPath,
InetSocketAddress nmAddr, String user, String appId, String locId,
List<Path> localDirs) throws IOException, InterruptedException {
List<String> localDirs, List<String> logDirs)
throws IOException, InterruptedException {
ContainerLocalizer localizer =
new ContainerLocalizer(this.lfs, user, appId, locId,
localDirs, RecordFactoryProvider.getRecordFactory(getConf()));
new ContainerLocalizer(lfs, user, appId, locId, getPaths(localDirs),
RecordFactoryProvider.getRecordFactory(getConf()));
createUserLocalDirs(localDirs, user);
createUserCacheDirs(localDirs, user);
createAppDirs(localDirs, user, appId);
createAppLogDirs(appId);
createAppLogDirs(appId, logDirs);
// TODO: Why pick first app dir. The same in LCE why not random?
Path appStorageDir = getFirstApplicationDir(localDirs, user, appId);
@ -104,8 +105,8 @@ public void startLocalizer(Path nmPrivateContainerTokensPath,
@Override
public int launchContainer(Container container,
Path nmPrivateContainerScriptPath, Path nmPrivateTokensPath,
String userName, String appId, Path containerWorkDir)
throws IOException {
String userName, String appId, Path containerWorkDir,
List<String> localDirs, List<String> logDirs) throws IOException {
ContainerId containerId = container.getContainerID();
@ -115,10 +116,7 @@ public int launchContainer(Container container,
ConverterUtils.toString(
container.getContainerID().getApplicationAttemptId().
getApplicationId());
String[] sLocalDirs = getConf().getStrings(
YarnConfiguration.NM_LOCAL_DIRS,
YarnConfiguration.DEFAULT_NM_LOCAL_DIRS);
for (String sLocalDir : sLocalDirs) {
for (String sLocalDir : localDirs) {
Path usersdir = new Path(sLocalDir, ContainerLocalizer.USERCACHE);
Path userdir = new Path(usersdir, userName);
Path appCacheDir = new Path(userdir, ContainerLocalizer.APPCACHE);
@ -128,7 +126,7 @@ public int launchContainer(Container container,
}
// Create the container log-dirs on all disks
createContainerLogDirs(appIdStr, containerIdStr);
createContainerLogDirs(appIdStr, containerIdStr, logDirs);
// copy launch script to work dir
Path launchDst =
@ -299,9 +297,9 @@ public void deleteAsUser(String user, Path subDir, Path... baseDirs)
* $logdir/$user/$appId */
private static final short LOGDIR_PERM = (short)0710;
private Path getFirstApplicationDir(List<Path> localDirs, String user,
private Path getFirstApplicationDir(List<String> localDirs, String user,
String appId) {
return getApplicationDir(localDirs.get(0), user, appId);
return getApplicationDir(new Path(localDirs.get(0)), user, appId);
}
private Path getApplicationDir(Path base, String user, String appId) {
@ -328,14 +326,14 @@ private Path getFileCacheDir(Path base, String user) {
* <li>$local.dir/usercache/$user</li>
* </ul>
*/
private void createUserLocalDirs(List<Path> localDirs, String user)
private void createUserLocalDirs(List<String> localDirs, String user)
throws IOException {
boolean userDirStatus = false;
FsPermission userperms = new FsPermission(USER_PERM);
for (Path localDir : localDirs) {
for (String localDir : localDirs) {
// create $local.dir/usercache/$user and its immediate parent
try {
lfs.mkdir(getUserCacheDir(localDir, user), userperms, true);
lfs.mkdir(getUserCacheDir(new Path(localDir), user), userperms, true);
} catch (IOException e) {
LOG.warn("Unable to create the user directory : " + localDir, e);
continue;
@ -357,7 +355,7 @@ private void createUserLocalDirs(List<Path> localDirs, String user)
* <li>$local.dir/usercache/$user/filecache</li>
* </ul>
*/
private void createUserCacheDirs(List<Path> localDirs, String user)
private void createUserCacheDirs(List<String> localDirs, String user)
throws IOException {
LOG.info("Initializing user " + user);
@ -366,9 +364,10 @@ private void createUserCacheDirs(List<Path> localDirs, String user)
FsPermission appCachePerms = new FsPermission(APPCACHE_PERM);
FsPermission fileperms = new FsPermission(FILECACHE_PERM);
for (Path localDir : localDirs) {
for (String localDir : localDirs) {
// create $local.dir/usercache/$user/appcache
final Path appDir = getAppcacheDir(localDir, user);
Path localDirPath = new Path(localDir);
final Path appDir = getAppcacheDir(localDirPath, user);
try {
lfs.mkdir(appDir, appCachePerms, true);
appcacheDirStatus = true;
@ -376,7 +375,7 @@ private void createUserCacheDirs(List<Path> localDirs, String user)
LOG.warn("Unable to create app cache directory : " + appDir, e);
}
// create $local.dir/usercache/$user/filecache
final Path distDir = getFileCacheDir(localDir, user);
final Path distDir = getFileCacheDir(localDirPath, user);
try {
lfs.mkdir(distDir, fileperms, true);
distributedCacheDirStatus = true;
@ -403,12 +402,12 @@ private void createUserCacheDirs(List<Path> localDirs, String user)
* </ul>
* @param localDirs
*/
private void createAppDirs(List<Path> localDirs, String user, String appId)
private void createAppDirs(List<String> localDirs, String user, String appId)
throws IOException {
boolean initAppDirStatus = false;
FsPermission appperms = new FsPermission(APPDIR_PERM);
for (Path localDir : localDirs) {
Path fullAppDir = getApplicationDir(localDir, user, appId);
for (String localDir : localDirs) {
Path fullAppDir = getApplicationDir(new Path(localDir), user, appId);
// create $local.dir/usercache/$user/appcache/$appId
try {
lfs.mkdir(fullAppDir, appperms, true);
@ -427,15 +426,12 @@ private void createAppDirs(List<Path> localDirs, String user, String appId)
/**
* Create application log directories on all disks.
*/
private void createAppLogDirs(String appId)
private void createAppLogDirs(String appId, List<String> logDirs)
throws IOException {
String[] rootLogDirs =
getConf()
.getStrings(YarnConfiguration.NM_LOG_DIRS, YarnConfiguration.DEFAULT_NM_LOG_DIRS);
boolean appLogDirStatus = false;
FsPermission appLogDirPerms = new FsPermission(LOGDIR_PERM);
for (String rootLogDir : rootLogDirs) {
for (String rootLogDir : logDirs) {
// create $log.dir/$appid
Path appLogDir = new Path(rootLogDir, appId);
try {
@ -455,15 +451,12 @@ private void createAppLogDirs(String appId)
/**
* Create application log directories on all disks.
*/
private void createContainerLogDirs(String appId, String containerId)
throws IOException {
String[] rootLogDirs =
getConf()
.getStrings(YarnConfiguration.NM_LOG_DIRS, YarnConfiguration.DEFAULT_NM_LOG_DIRS);
private void createContainerLogDirs(String appId, String containerId,
List<String> logDirs) throws IOException {
boolean containerLogDirStatus = false;
FsPermission containerLogDirPerms = new FsPermission(LOGDIR_PERM);
for (String rootLogDir : rootLogDirs) {
for (String rootLogDir : logDirs) {
// create $log.dir/$appid/$containerid
Path appLogDir = new Path(rootLogDir, appId);
Path containerLogDir = new Path(appLogDir, containerId);
@ -483,4 +476,15 @@ private void createContainerLogDirs(String appId, String containerId)
+ containerId);
}
}
/**
* @return the list of paths of given local directories
*/
private static List<Path> getPaths(List<String> dirs) {
List<Path> paths = new ArrayList<Path>(dirs.size());
for (int i = 0; i < dirs.size(); i++) {
paths.add(new Path(dirs.get(i)));
}
return paths;
}
}

View File

@ -0,0 +1,96 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.server.nodemanager;
import java.io.File;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.ListIterator;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.util.DiskChecker;
import org.apache.hadoop.util.DiskChecker.DiskErrorException;
/**
* Manages a list of local storage directories.
*/
class DirectoryCollection {
private static final Log LOG = LogFactory.getLog(DirectoryCollection.class);
// Good local storage directories
private List<String> localDirs;
private List<String> failedDirs;
private int numFailures;
public DirectoryCollection(String[] dirs) {
localDirs = new ArrayList<String>();
localDirs.addAll(Arrays.asList(dirs));
failedDirs = new ArrayList<String>();
}
/**
* @return the current valid directories
*/
synchronized List<String> getGoodDirs() {
return localDirs;
}
/**
* @return the failed directories
*/
synchronized List<String> getFailedDirs() {
return failedDirs;
}
/**
* @return total the number of directory failures seen till now
*/
synchronized int getNumFailures() {
return numFailures;
}
/**
* Check the health of current set of local directories, updating the list
* of valid directories if necessary.
* @return <em>true</em> if there is a new disk-failure identified in
* this checking. <em>false</em> otherwise.
*/
synchronized boolean checkDirs() {
int oldNumFailures = numFailures;
ListIterator<String> it = localDirs.listIterator();
while (it.hasNext()) {
final String dir = it.next();
try {
DiskChecker.checkDir(new File(dir));
} catch (DiskErrorException de) {
LOG.warn("Directory " + dir + " error " +
de.getMessage() + ", removing from the list of valid directories.");
it.remove();
failedDirs.add(dir);
numFailures++;
}
}
if (numFailures > oldNumFailures) {
return true;
}
return false;
}
}

View File

@ -126,13 +126,18 @@ public void init() throws IOException {
@Override
public void startLocalizer(Path nmPrivateContainerTokensPath,
InetSocketAddress nmAddr, String user, String appId, String locId,
List<Path> localDirs) throws IOException, InterruptedException {
List<String> localDirs, List<String> logDirs)
throws IOException, InterruptedException {
List<String> command = new ArrayList<String>(
Arrays.asList(containerExecutorExe,
user,
Integer.toString(Commands.INITIALIZE_CONTAINER.getValue()),
appId,
nmPrivateContainerTokensPath.toUri().getPath().toString()));
nmPrivateContainerTokensPath.toUri().getPath().toString(),
StringUtils.join(",", localDirs),
StringUtils.join(",", logDirs)));
File jvm = // use same jvm as parent
new File(new File(System.getProperty("java.home"), "bin"), "java");
command.add(jvm.toString());
@ -148,8 +153,8 @@ public void startLocalizer(Path nmPrivateContainerTokensPath,
command.add(locId);
command.add(nmAddr.getHostName());
command.add(Integer.toString(nmAddr.getPort()));
for (Path p : localDirs) {
command.add(p.toUri().getPath().toString());
for (String dir : localDirs) {
command.add(dir);
}
String[] commandArray = command.toArray(new String[command.size()]);
ShellCommandExecutor shExec = new ShellCommandExecutor(commandArray);
@ -174,7 +179,8 @@ public void startLocalizer(Path nmPrivateContainerTokensPath,
@Override
public int launchContainer(Container container,
Path nmPrivateCotainerScriptPath, Path nmPrivateTokensPath,
String user, String appId, Path containerWorkDir) throws IOException {
String user, String appId, Path containerWorkDir,
List<String> localDirs, List<String> logDirs) throws IOException {
ContainerId containerId = container.getContainerID();
String containerIdStr = ConverterUtils.toString(containerId);
@ -189,8 +195,10 @@ public int launchContainer(Container container,
.toString(Commands.LAUNCH_CONTAINER.getValue()), appId,
containerIdStr, containerWorkDir.toString(),
nmPrivateCotainerScriptPath.toUri().getPath().toString(),
nmPrivateTokensPath.toUri().getPath().toString(), pidFilePath
.toString()));
nmPrivateTokensPath.toUri().getPath().toString(),
pidFilePath.toString(),
StringUtils.join(",", localDirs),
StringUtils.join(",", logDirs)));
String[] commandArray = command.toArray(new String[command.size()]);
shExec = new ShellCommandExecutor(commandArray, null, // NM's cwd
container.getLaunchContext().getEnvironment()); // sanitized env

View File

@ -0,0 +1,297 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.server.nodemanager;
import java.io.IOException;
import java.util.List;
import java.util.Timer;
import java.util.TimerTask;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.LocalDirAllocator;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.util.StringUtils;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.service.AbstractService;
/**
* The class which provides functionality of checking the health of the local
* directories of a node. This specifically manages nodemanager-local-dirs and
* nodemanager-log-dirs by periodically checking their health.
*/
public class LocalDirsHandlerService extends AbstractService {
private static Log LOG = LogFactory.getLog(LocalDirsHandlerService.class);
/** Timer used to schedule disk health monitoring code execution */
private Timer dirsHandlerScheduler;
private long diskHealthCheckInterval;
private boolean isDiskHealthCheckerEnabled;
/**
* Minimum fraction of disks to be healthy for the node to be healthy in
* terms of disks. This applies to nm-local-dirs and nm-log-dirs.
*/
private float minNeededHealthyDisksFactor;
private MonitoringTimerTask monitoringTimerTask;
/** Local dirs to store localized files in */
private DirectoryCollection localDirs = null;
/** storage for container logs*/
private DirectoryCollection logDirs = null;
/**
* Everybody should go through this LocalDirAllocator object for read/write
* of any local path corresponding to {@link YarnConfiguration#NM_LOCAL_DIRS}
* instead of creating his/her own LocalDirAllocator objects
*/
private LocalDirAllocator localDirsAllocator;
/**
* Everybody should go through this LocalDirAllocator object for read/write
* of any local path corresponding to {@link YarnConfiguration#NM_LOG_DIRS}
* instead of creating his/her own LocalDirAllocator objects
*/
private LocalDirAllocator logDirsAllocator;
/** when disk health checking code was last run */
private long lastDisksCheckTime;
/**
* Class which is used by the {@link Timer} class to periodically execute the
* disks' health checker code.
*/
private final class MonitoringTimerTask extends TimerTask {
public MonitoringTimerTask(Configuration conf) {
localDirs = new DirectoryCollection(
conf.getTrimmedStrings(YarnConfiguration.NM_LOCAL_DIRS));
logDirs = new DirectoryCollection(
conf.getTrimmedStrings(YarnConfiguration.NM_LOG_DIRS));
localDirsAllocator =
new LocalDirAllocator(YarnConfiguration.NM_LOCAL_DIRS);
logDirsAllocator = new LocalDirAllocator(YarnConfiguration.NM_LOG_DIRS);
}
@Override
public void run() {
boolean newFailure = false;
if (localDirs.checkDirs()) {
newFailure = true;
}
if (logDirs.checkDirs()) {
newFailure = true;
}
if (newFailure) {
LOG.info("Disk(s) failed. " + getDisksHealthReport());
updateDirsInConfiguration();
if (!areDisksHealthy()) {
// Just log.
LOG.error("Most of the disks failed. " + getDisksHealthReport());
}
}
lastDisksCheckTime = System.currentTimeMillis();
}
}
public LocalDirsHandlerService() {
super(LocalDirsHandlerService.class.getName());
}
/**
* Method which initializes the timertask and its interval time.
*/
@Override
public void init(Configuration config) {
// Clone the configuration as we may do modifications to dirs-list
Configuration conf = new Configuration(config);
diskHealthCheckInterval = conf.getLong(
YarnConfiguration.NM_DISK_HEALTH_CHECK_INTERVAL_MS,
YarnConfiguration.DEFAULT_NM_DISK_HEALTH_CHECK_INTERVAL_MS);
monitoringTimerTask = new MonitoringTimerTask(conf);
isDiskHealthCheckerEnabled = conf.getBoolean(
YarnConfiguration.NM_DISK_HEALTH_CHECK_ENABLE, true);
minNeededHealthyDisksFactor = conf.getFloat(
YarnConfiguration.NM_MIN_HEALTHY_DISKS_FRACTION,
YarnConfiguration.DEFAULT_NM_MIN_HEALTHY_DISKS_FRACTION);
lastDisksCheckTime = System.currentTimeMillis();
super.init(conf);
}
/**
* Method used to start the disk health monitoring, if enabled.
*/
@Override
public void start() {
if (isDiskHealthCheckerEnabled) {
dirsHandlerScheduler = new Timer("DiskHealthMonitor-Timer", true);
// Start the timer task for disk health checking immediately and
// then run periodically at interval time.
dirsHandlerScheduler.scheduleAtFixedRate(monitoringTimerTask, 0,
diskHealthCheckInterval);
}
super.start();
}
/**
* Method used to terminate the disk health monitoring service.
*/
@Override
public void stop() {
if (dirsHandlerScheduler != null) {
dirsHandlerScheduler.cancel();
}
super.stop();
}
/**
* @return the good/valid local directories based on disks' health
*/
public List<String> getLocalDirs() {
return localDirs.getGoodDirs();
}
/**
* @return the good/valid log directories based on disks' health
*/
public List<String> getLogDirs() {
return logDirs.getGoodDirs();
}
/**
* @return the health report of nm-local-dirs and nm-log-dirs
*/
public String getDisksHealthReport() {
if (!isDiskHealthCheckerEnabled) {
return "";
}
StringBuilder report = new StringBuilder();
List<String> failedLocalDirsList = localDirs.getFailedDirs();
List<String> failedLogDirsList = logDirs.getFailedDirs();
int numLocalDirs = localDirs.getGoodDirs().size()
+ failedLocalDirsList.size();
int numLogDirs = logDirs.getGoodDirs().size() + failedLogDirsList.size();
if (!failedLocalDirsList.isEmpty()) {
report.append(failedLocalDirsList.size() + "/" + numLocalDirs
+ " local-dirs turned bad: "
+ StringUtils.join(",", failedLocalDirsList) + ";");
}
if (!failedLogDirsList.isEmpty()) {
report.append(failedLogDirsList.size() + "/" + numLogDirs
+ " log-dirs turned bad: "
+ StringUtils.join(",", failedLogDirsList));
}
return report.toString();
}
/**
* The minimum fraction of number of disks needed to be healthy for a node to
* be considered healthy in terms of disks is configured using
* {@link YarnConfiguration#NM_MIN_HEALTHY_DISKS_FRACTION}, with a default
* value of {@link YarnConfiguration#DEFAULT_NM_MIN_HEALTHY_DISKS_FRACTION}.
* @return <em>false</em> if either (a) more than the allowed percentage of
* nm-local-dirs failed or (b) more than the allowed percentage of
* nm-log-dirs failed.
*/
public boolean areDisksHealthy() {
if (!isDiskHealthCheckerEnabled) {
return true;
}
int goodDirs = getLocalDirs().size();
int failedDirs = localDirs.getFailedDirs().size();
int totalConfiguredDirs = goodDirs + failedDirs;
if (goodDirs/(float)totalConfiguredDirs < minNeededHealthyDisksFactor) {
return false; // Not enough healthy local-dirs
}
goodDirs = getLogDirs().size();
failedDirs = logDirs.getFailedDirs().size();
totalConfiguredDirs = goodDirs + failedDirs;
if (goodDirs/(float)totalConfiguredDirs < minNeededHealthyDisksFactor) {
return false; // Not enough healthy log-dirs
}
return true;
}
public long getLastDisksCheckTime() {
return lastDisksCheckTime;
}
/**
* Set good local dirs and good log dirs in the configuration so that the
* LocalDirAllocator objects will use this updated configuration only.
*/
private void updateDirsInConfiguration() {
Configuration conf = getConfig();
List<String> localDirs = getLocalDirs();
conf.setStrings(YarnConfiguration.NM_LOCAL_DIRS,
localDirs.toArray(new String[localDirs.size()]));
List<String> logDirs = getLogDirs();
synchronized(conf) {
conf.setStrings(YarnConfiguration.NM_LOG_DIRS,
logDirs.toArray(new String[logDirs.size()]));
}
}
public Path getLocalPathForWrite(String pathStr) throws IOException {
Configuration conf = getConfig();
Path path = null;
synchronized (conf) {
path = localDirsAllocator.getLocalPathForWrite(pathStr, conf);
}
return path;
}
public Path getLocalPathForWrite(String pathStr, long size,
boolean checkWrite) throws IOException {
Configuration conf = getConfig();
Path path = null;
synchronized (conf) {
path = localDirsAllocator.getLocalPathForWrite(pathStr, size, conf,
checkWrite);
}
return path;
}
public Path getLogPathForWrite(String pathStr, boolean checkWrite)
throws IOException {
Configuration conf = getConfig();
Path path = null;
synchronized (conf) {
path = logDirsAllocator.getLocalPathForWrite(pathStr,
LocalDirAllocator.SIZE_UNKNOWN, conf, checkWrite);
}
return path;
}
public Path getLogPathToRead(String pathStr) throws IOException {
Configuration conf = getConfig();
Path path = null;
synchronized (conf) {
path = logDirsAllocator.getLocalPathToRead(pathStr, conf);
}
return path;
}
}

View File

@ -0,0 +1,97 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.server.nodemanager;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.yarn.service.CompositeService;
/**
* The class which provides functionality of checking the health of the node and
* reporting back to the service for which the health checker has been asked to
* report.
*/
public class NodeHealthCheckerService extends CompositeService {
private NodeHealthScriptRunner nodeHealthScriptRunner;
private LocalDirsHandlerService dirsHandler;
static final String SEPARATOR = ";";
public NodeHealthCheckerService() {
super(NodeHealthCheckerService.class.getName());
dirsHandler = new LocalDirsHandlerService();
}
@Override
public void init(Configuration conf) {
if (NodeHealthScriptRunner.shouldRun(conf)) {
nodeHealthScriptRunner = new NodeHealthScriptRunner();
addService(nodeHealthScriptRunner);
}
addService(dirsHandler);
super.init(conf);
}
/**
* @return the reporting string of health of the node
*/
String getHealthReport() {
String scriptReport = (nodeHealthScriptRunner == null) ? ""
: nodeHealthScriptRunner.getHealthReport();
if (scriptReport.equals("")) {
return dirsHandler.getDisksHealthReport();
} else {
return scriptReport.concat(SEPARATOR + dirsHandler.getDisksHealthReport());
}
}
/**
* @return <em>true</em> if the node is healthy
*/
boolean isHealthy() {
boolean scriptHealthStatus = (nodeHealthScriptRunner == null) ? true
: nodeHealthScriptRunner.isHealthy();
return scriptHealthStatus && dirsHandler.areDisksHealthy();
}
/**
* @return when the last time the node health status is reported
*/
long getLastHealthReportTime() {
long diskCheckTime = dirsHandler.getLastDisksCheckTime();
long lastReportTime = (nodeHealthScriptRunner == null)
? diskCheckTime
: Math.max(nodeHealthScriptRunner.getLastReportedTime(), diskCheckTime);
return lastReportTime;
}
/**
* @return the disk handler
*/
public LocalDirsHandlerService getDiskHandler() {
return dirsHandler;
}
/**
* @return the node health script runner
*/
NodeHealthScriptRunner getNodeHealthScriptRunner() {
return nodeHealthScriptRunner;
}
}

View File

@ -16,7 +16,7 @@
* limitations under the License.
*/
package org.apache.hadoop;
package org.apache.hadoop.yarn.server.nodemanager;
import java.io.File;
import java.io.IOException;
@ -31,19 +31,18 @@
import org.apache.hadoop.util.Shell.ExitCodeException;
import org.apache.hadoop.util.Shell.ShellCommandExecutor;
import org.apache.hadoop.util.StringUtils;
import org.apache.hadoop.yarn.api.records.NodeHealthStatus;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.service.AbstractService;
/**
*
* The class which provides functionality of checking the health of the node and
* reporting back to the service for which the health checker has been asked to
* report.
* The class which provides functionality of checking the health of the node
* using the configured node health script and reporting back to the service
* for which the health checker has been asked to report.
*/
public class NodeHealthCheckerService extends AbstractService {
public class NodeHealthScriptRunner extends AbstractService {
private static Log LOG = LogFactory.getLog(NodeHealthCheckerService.class);
private static Log LOG = LogFactory.getLog(NodeHealthScriptRunner.class);
/** Absolute path to the health script. */
private String nodeHealthScript;
@ -74,7 +73,6 @@ public class NodeHealthCheckerService extends AbstractService {
private TimerTask timer;
private enum HealthCheckerExitStatus {
SUCCESS,
TIMED_OUT,
@ -187,18 +185,13 @@ private boolean hasErrors(String output) {
}
}
public NodeHealthCheckerService() {
super(NodeHealthCheckerService.class.getName());
public NodeHealthScriptRunner() {
super(NodeHealthScriptRunner.class.getName());
this.lastReportedTime = System.currentTimeMillis();
this.isHealthy = true;
this.healthReport = "";
}
public NodeHealthCheckerService(Configuration conf) {
this();
init(conf);
}
/*
* Method which initializes the values for the script path and interval time.
*/
@ -257,12 +250,12 @@ public void stop() {
*
* @return true if node is healthy
*/
private boolean isHealthy() {
public boolean isHealthy() {
return isHealthy;
}
/**
* Sets if the node is healhty or not.
* Sets if the node is healhty or not considering disks' health also.
*
* @param isHealthy
* if or not node is healthy
@ -277,13 +270,14 @@ private synchronized void setHealthy(boolean isHealthy) {
*
* @return output from health script
*/
private String getHealthReport() {
public String getHealthReport() {
return healthReport;
}
/**
* Sets the health report from the node health script.
*
* Sets the health report from the node health script. Also set the disks'
* health info obtained from DiskHealthCheckerService.
*
* @param healthReport
*/
private synchronized void setHealthReport(String healthReport) {
@ -295,7 +289,7 @@ private synchronized void setHealthReport(String healthReport) {
*
* @return timestamp when node health script was last run
*/
private long getLastReportedTime() {
public long getLastReportedTime() {
return lastReportedTime;
}
@ -340,27 +334,12 @@ private synchronized void setHealthStatus(boolean isHealthy, String output,
this.setHealthStatus(isHealthy, output);
this.setLastReportedTime(time);
}
/**
* Method to populate the fields for the {@link NodeHealthStatus}
*
* @param healthStatus
* Used only by tests to access the timer task directly
* @return the timer task
*/
public synchronized void setHealthStatus(NodeHealthStatus healthStatus) {
healthStatus.setIsNodeHealthy(this.isHealthy());
healthStatus.setHealthReport(this.getHealthReport());
healthStatus.setLastHealthReportTime(this.getLastReportedTime());
}
/**
* Test method to directly access the timer which node
* health checker would use.
*
*
* @return Timer task
*/
//XXX:Not to be used directly.
TimerTask getTimer() {
TimerTask getTimerTask() {
return timer;
}
}

View File

@ -25,7 +25,6 @@
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.NodeHealthCheckerService;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem;
import org.apache.hadoop.security.SecurityUtil;
@ -59,6 +58,8 @@ public class NodeManager extends CompositeService implements
protected final NodeManagerMetrics metrics = NodeManagerMetrics.create();
protected ContainerTokenSecretManager containerTokenSecretManager;
private ApplicationACLsManager aclsManager;
private NodeHealthCheckerService nodeHealthChecker;
private LocalDirsHandlerService dirsHandler;
public NodeManager() {
super(NodeManager.class.getName());
@ -78,14 +79,16 @@ protected NodeResourceMonitor createNodeResourceMonitor() {
protected ContainerManagerImpl createContainerManager(Context context,
ContainerExecutor exec, DeletionService del,
NodeStatusUpdater nodeStatusUpdater, ContainerTokenSecretManager
containerTokenSecretManager, ApplicationACLsManager aclsManager) {
containerTokenSecretManager, ApplicationACLsManager aclsManager,
LocalDirsHandlerService dirsHandler) {
return new ContainerManagerImpl(context, exec, del, nodeStatusUpdater,
metrics, containerTokenSecretManager, aclsManager);
metrics, containerTokenSecretManager, aclsManager, dirsHandler);
}
protected WebServer createWebServer(Context nmContext,
ResourceView resourceView, ApplicationACLsManager aclsManager) {
return new WebServer(nmContext, resourceView, aclsManager);
ResourceView resourceView, ApplicationACLsManager aclsManager,
LocalDirsHandlerService dirsHandler) {
return new WebServer(nmContext, resourceView, aclsManager, dirsHandler);
}
protected void doSecureLogin() throws IOException {
@ -121,16 +124,12 @@ public void init(Configuration conf) {
// NodeManager level dispatcher
AsyncDispatcher dispatcher = new AsyncDispatcher();
NodeHealthCheckerService healthChecker = null;
if (NodeHealthCheckerService.shouldRun(conf)) {
healthChecker = new NodeHealthCheckerService();
addService(healthChecker);
}
nodeHealthChecker = new NodeHealthCheckerService();
addService(nodeHealthChecker);
dirsHandler = nodeHealthChecker.getDiskHandler();
NodeStatusUpdater nodeStatusUpdater =
createNodeStatusUpdater(context, dispatcher, healthChecker,
this.containerTokenSecretManager);
NodeStatusUpdater nodeStatusUpdater = createNodeStatusUpdater(context,
dispatcher, nodeHealthChecker, this.containerTokenSecretManager);
nodeStatusUpdater.register(this);
NodeResourceMonitor nodeResourceMonitor = createNodeResourceMonitor();
@ -138,11 +137,11 @@ public void init(Configuration conf) {
ContainerManagerImpl containerManager =
createContainerManager(context, exec, del, nodeStatusUpdater,
this.containerTokenSecretManager, this.aclsManager);
this.containerTokenSecretManager, this.aclsManager, dirsHandler);
addService(containerManager);
Service webServer = createWebServer(context, containerManager
.getContainersMonitor(), this.aclsManager);
.getContainersMonitor(), this.aclsManager, dirsHandler);
addService(webServer);
dispatcher.register(ContainerManagerEventType.class, containerManager);
@ -215,7 +214,14 @@ public NodeHealthStatus getNodeHealthStatus() {
}
}
/**
* @return the node health checker
*/
public NodeHealthCheckerService getNodeHealthChecker() {
return nodeHealthChecker;
}
@Override
public void stateChanged(Service service) {
// Shutdown the Nodemanager when the NodeStatusUpdater is stopped.

View File

@ -27,7 +27,6 @@
import org.apache.avro.AvroRuntimeException;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.NodeHealthCheckerService;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.net.NetUtils;
import org.apache.hadoop.security.UserGroupInformation;
@ -222,11 +221,14 @@ private NodeStatus getNodeStatus() {
+ numActiveContainers + " containers");
NodeHealthStatus nodeHealthStatus = this.context.getNodeHealthStatus();
if (this.healthChecker != null) {
this.healthChecker.setHealthStatus(nodeHealthStatus);
nodeHealthStatus.setHealthReport(healthChecker.getHealthReport());
nodeHealthStatus.setIsNodeHealthy(healthChecker.isHealthy());
nodeHealthStatus.setLastHealthReportTime(
healthChecker.getLastHealthReportTime());
if (LOG.isDebugEnabled()) {
LOG.debug("Node's health-status : " + nodeHealthStatus.getIsNodeHealthy()
+ ", " + nodeHealthStatus.getHealthReport());
}
LOG.debug("Node's health-status : " + nodeHealthStatus.getIsNodeHealthy()
+ ", " + nodeHealthStatus.getHealthReport());
nodeStatus.setNodeHealthStatus(nodeHealthStatus);
return nodeStatus;

View File

@ -68,6 +68,7 @@
import org.apache.hadoop.yarn.server.nodemanager.ContainerManagerEvent;
import org.apache.hadoop.yarn.server.nodemanager.Context;
import org.apache.hadoop.yarn.server.nodemanager.DeletionService;
import org.apache.hadoop.yarn.server.nodemanager.LocalDirsHandlerService;
import org.apache.hadoop.yarn.server.nodemanager.NMAuditLogger;
import org.apache.hadoop.yarn.server.nodemanager.NMAuditLogger.AuditConstants;
import org.apache.hadoop.yarn.server.nodemanager.NodeStatusUpdater;
@ -120,7 +121,8 @@ public class ContainerManagerImpl extends CompositeService implements
private ContainerTokenSecretManager containerTokenSecretManager;
private final RecordFactory recordFactory = RecordFactoryProvider.getRecordFactory(null);
protected LocalDirsHandlerService dirsHandler;
protected final AsyncDispatcher dispatcher;
private final ApplicationACLsManager aclsManager;
@ -129,9 +131,12 @@ public class ContainerManagerImpl extends CompositeService implements
public ContainerManagerImpl(Context context, ContainerExecutor exec,
DeletionService deletionContext, NodeStatusUpdater nodeStatusUpdater,
NodeManagerMetrics metrics, ContainerTokenSecretManager
containerTokenSecretManager, ApplicationACLsManager aclsManager) {
containerTokenSecretManager, ApplicationACLsManager aclsManager,
LocalDirsHandlerService dirsHandler) {
super(ContainerManagerImpl.class.getName());
this.context = context;
this.dirsHandler = dirsHandler;
dispatcher = new AsyncDispatcher();
this.deletionService = deletionContext;
this.metrics = metrics;
@ -190,9 +195,10 @@ protected LogHandler createLogHandler(Configuration conf, Context context,
if (conf.getBoolean(YarnConfiguration.NM_LOG_AGGREGATION_ENABLED,
YarnConfiguration.DEFAULT_NM_LOG_AGGREGATION_ENABLED)) {
return new LogAggregationService(this.dispatcher, context,
deletionService);
deletionService, dirsHandler);
} else {
return new NonAggregatingLogHandler(this.dispatcher, deletionService);
return new NonAggregatingLogHandler(this.dispatcher, deletionService,
dirsHandler);
}
}
@ -203,12 +209,12 @@ public ContainersMonitor getContainersMonitor() {
protected ResourceLocalizationService createResourceLocalizationService(
ContainerExecutor exec, DeletionService deletionContext) {
return new ResourceLocalizationService(this.dispatcher, exec,
deletionContext);
deletionContext, dirsHandler);
}
protected ContainersLauncher createContainersLauncher(Context context,
ContainerExecutor exec) {
return new ContainersLauncher(context, this.dispatcher, exec);
return new ContainersLauncher(context, this.dispatcher, exec, dirsHandler);
}
@Override

View File

@ -22,14 +22,20 @@
public class ContainerExitEvent extends ContainerEvent {
private int exitCode;
private final String diagnosticInfo;
public ContainerExitEvent(ContainerId cID, ContainerEventType eventType,
int exitCode) {
int exitCode, String diagnosticInfo) {
super(cID, eventType);
this.exitCode = exitCode;
this.diagnosticInfo = diagnosticInfo;
}
public int getExitCode() {
return this.exitCode;
}
public String getDiagnosticInfo() {
return diagnosticInfo;
}
}

View File

@ -50,6 +50,7 @@
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.event.Dispatcher;
import org.apache.hadoop.yarn.server.nodemanager.ContainerExecutor;
import org.apache.hadoop.yarn.server.nodemanager.LocalDirsHandlerService;
import org.apache.hadoop.yarn.server.nodemanager.ContainerExecutor.DelayedProcessKiller;
import org.apache.hadoop.yarn.server.nodemanager.ContainerExecutor.ExitCode;
import org.apache.hadoop.yarn.server.nodemanager.ContainerExecutor.Signal;
@ -78,7 +79,6 @@ public class ContainerLaunch implements Callable<Integer> {
private final Application app;
private final Container container;
private final Configuration conf;
private final LocalDirAllocator logDirsSelector;
private volatile AtomicBoolean shouldLaunchContainer = new AtomicBoolean(false);
private volatile AtomicBoolean completed = new AtomicBoolean(false);
@ -88,14 +88,17 @@ public class ContainerLaunch implements Callable<Integer> {
private Path pidFilePath = null;
private final LocalDirsHandlerService dirsHandler;
public ContainerLaunch(Configuration configuration, Dispatcher dispatcher,
ContainerExecutor exec, Application app, Container container) {
ContainerExecutor exec, Application app, Container container,
LocalDirsHandlerService dirsHandler) {
this.conf = configuration;
this.app = app;
this.exec = exec;
this.container = container;
this.dispatcher = dispatcher;
this.logDirsSelector = new LocalDirAllocator(YarnConfiguration.NM_LOG_DIRS);
this.dirsHandler = dirsHandler;
this.sleepDelayBeforeSigKill =
conf.getLong(YarnConfiguration.NM_SLEEP_DELAY_BEFORE_SIGKILL_MS,
YarnConfiguration.DEFAULT_NM_SLEEP_DELAY_BEFORE_SIGKILL_MS);
@ -121,9 +124,8 @@ public Integer call() {
List<String> newCmds = new ArrayList<String>(command.size());
String appIdStr = app.getAppId().toString();
Path containerLogDir =
this.logDirsSelector.getLocalPathForWrite(ContainerLaunch
.getRelativeContainerLogDir(appIdStr, containerIdStr),
LocalDirAllocator.SIZE_UNKNOWN, this.conf, false);
dirsHandler.getLogPathForWrite(ContainerLaunch
.getRelativeContainerLogDir(appIdStr, containerIdStr), false);
for (String str : command) {
// TODO: Should we instead work via symlinks without this grammar?
newCmds.add(str.replace(ApplicationConstants.LOG_DIR_EXPANSION_VAR,
@ -144,47 +146,49 @@ public Integer call() {
// /////////////////////////// End of variable expansion
FileContext lfs = FileContext.getLocalFSFileContext();
LocalDirAllocator lDirAllocator =
new LocalDirAllocator(YarnConfiguration.NM_LOCAL_DIRS); // TODO
Path nmPrivateContainerScriptPath =
lDirAllocator.getLocalPathForWrite(
dirsHandler.getLocalPathForWrite(
getContainerPrivateDir(appIdStr, containerIdStr) + Path.SEPARATOR
+ CONTAINER_SCRIPT, this.conf);
+ CONTAINER_SCRIPT);
Path nmPrivateTokensPath =
lDirAllocator.getLocalPathForWrite(
dirsHandler.getLocalPathForWrite(
getContainerPrivateDir(appIdStr, containerIdStr)
+ Path.SEPARATOR
+ String.format(ContainerLocalizer.TOKEN_FILE_NAME_FMT,
containerIdStr), this.conf);
containerIdStr));
DataOutputStream containerScriptOutStream = null;
DataOutputStream tokensOutStream = null;
// Select the working directory for the container
Path containerWorkDir =
lDirAllocator.getLocalPathForWrite(ContainerLocalizer.USERCACHE
dirsHandler.getLocalPathForWrite(ContainerLocalizer.USERCACHE
+ Path.SEPARATOR + user + Path.SEPARATOR
+ ContainerLocalizer.APPCACHE + Path.SEPARATOR + appIdStr
+ Path.SEPARATOR + containerIdStr,
LocalDirAllocator.SIZE_UNKNOWN, this.conf, false);
LocalDirAllocator.SIZE_UNKNOWN, false);
String pidFileSuffix = String.format(ContainerLaunch.PID_FILE_NAME_FMT,
containerIdStr);
// pid file should be in nm private dir so that it is not
// accessible by users
pidFilePath = lDirAllocator.getLocalPathForWrite(
pidFilePath = dirsHandler.getLocalPathForWrite(
ResourceLocalizationService.NM_PRIVATE_DIR + Path.SEPARATOR
+ pidFileSuffix,
this.conf);
+ pidFileSuffix);
List<String> localDirs = dirsHandler.getLocalDirs();
List<String> logDirs = dirsHandler.getLogDirs();
if (!dirsHandler.areDisksHealthy()) {
ret = ExitCode.DISKS_FAILED.getExitCode();
throw new IOException("Most of the disks failed. "
+ dirsHandler.getDisksHealthReport());
}
try {
// /////////// Write out the container-script in the nmPrivate space.
String[] localDirs =
this.conf.getStrings(YarnConfiguration.NM_LOCAL_DIRS,
YarnConfiguration.DEFAULT_NM_LOCAL_DIRS);
List<Path> appDirs = new ArrayList<Path>(localDirs.length);
List<Path> appDirs = new ArrayList<Path>(localDirs.size());
for (String localDir : localDirs) {
Path usersdir = new Path(localDir, ContainerLocalizer.USERCACHE);
Path userdir = new Path(usersdir, user);
@ -234,30 +238,34 @@ public Integer call() {
}
else {
exec.activateContainer(containerID, pidFilePath);
ret =
exec.launchContainer(container, nmPrivateContainerScriptPath,
nmPrivateTokensPath, user, appIdStr, containerWorkDir);
ret = exec.launchContainer(container, nmPrivateContainerScriptPath,
nmPrivateTokensPath, user, appIdStr, containerWorkDir,
localDirs, logDirs);
}
} catch (Throwable e) {
LOG.warn("Failed to launch container", e);
LOG.warn("Failed to launch container.", e);
dispatcher.getEventHandler().handle(new ContainerExitEvent(
launchContext.getContainerId(),
ContainerEventType.CONTAINER_EXITED_WITH_FAILURE, ret));
ContainerEventType.CONTAINER_EXITED_WITH_FAILURE, ret,
e.getMessage()));
return ret;
} finally {
completed.set(true);
exec.deactivateContainer(containerID);
}
LOG.debug("Container " + containerIdStr + " completed with exit code "
+ ret);
if (LOG.isDebugEnabled()) {
LOG.debug("Container " + containerIdStr + " completed with exit code "
+ ret);
}
if (ret == ExitCode.FORCE_KILLED.getExitCode()
|| ret == ExitCode.TERMINATED.getExitCode()) {
// If the process was killed, Send container_cleanedup_after_kill and
// just break out of this method.
dispatcher.getEventHandler().handle(
new ContainerExitEvent(launchContext.getContainerId(),
ContainerEventType.CONTAINER_KILLED_ON_REQUEST, ret));
ContainerEventType.CONTAINER_KILLED_ON_REQUEST, ret,
"Container exited with a non-zero exit code " + ret));
return ret;
}
@ -265,7 +273,8 @@ public Integer call() {
LOG.warn("Container exited with a non-zero exit code " + ret);
this.dispatcher.getEventHandler().handle(new ContainerExitEvent(
launchContext.getContainerId(),
ContainerEventType.CONTAINER_EXITED_WITH_FAILURE, ret));
ContainerEventType.CONTAINER_EXITED_WITH_FAILURE, ret,
"Container exited with a non-zero exit code " + ret));
return ret;
}

View File

@ -33,10 +33,10 @@
import org.apache.hadoop.fs.UnsupportedFileSystemException;
import org.apache.hadoop.yarn.YarnException;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.event.Dispatcher;
import org.apache.hadoop.yarn.event.EventHandler;
import org.apache.hadoop.yarn.server.nodemanager.ContainerExecutor;
import org.apache.hadoop.yarn.server.nodemanager.LocalDirsHandlerService;
import org.apache.hadoop.yarn.server.nodemanager.Context;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.Application;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container;
@ -59,6 +59,8 @@ public class ContainersLauncher extends AbstractService
private final Context context;
private final ContainerExecutor exec;
private final Dispatcher dispatcher;
private LocalDirsHandlerService dirsHandler;
private final ExecutorService containerLauncher =
Executors.newCachedThreadPool(
new ThreadFactoryBuilder()
@ -80,11 +82,12 @@ public RunningContainer(Future<Integer> submit,
public ContainersLauncher(Context context, Dispatcher dispatcher,
ContainerExecutor exec) {
ContainerExecutor exec, LocalDirsHandlerService dirsHandler) {
super("containers-launcher");
this.exec = exec;
this.context = context;
this.dispatcher = dispatcher;
this.dirsHandler = dirsHandler;
}
@Override
@ -114,15 +117,19 @@ public void handle(ContainersLauncherEvent event) {
Application app =
context.getApplications().get(
containerId.getApplicationAttemptId().getApplicationId());
ContainerLaunch launch =
new ContainerLaunch(getConfig(), dispatcher, exec, app,
event.getContainer());
ContainerLaunch launch = new ContainerLaunch(getConfig(), dispatcher,
exec, app, event.getContainer(), dirsHandler);
running.put(containerId,
new RunningContainer(containerLauncher.submit(launch),
launch));
break;
case CLEANUP_CONTAINER:
RunningContainer rContainerDatum = running.remove(containerId);
if (rContainerDatum == null) {
// Container not launched. So nothing needs to be done.
return;
}
Future<Integer> rContainer = rContainerDatum.runningcontainer;
if (rContainer != null
&& !rContainer.isDone()) {

View File

@ -45,12 +45,10 @@
import org.apache.hadoop.fs.LocalDirAllocator;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.security.Credentials;
import org.apache.hadoop.security.SecurityInfo;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.security.token.Token;
import org.apache.hadoop.security.token.TokenIdentifier;
import org.apache.hadoop.yarn.api.records.LocalResource;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.exceptions.YarnRemoteException;
import org.apache.hadoop.yarn.factories.RecordFactory;
import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
@ -61,7 +59,6 @@
import org.apache.hadoop.yarn.server.nodemanager.api.protocolrecords.LocalizerHeartbeatResponse;
import org.apache.hadoop.yarn.server.nodemanager.api.protocolrecords.LocalizerStatus;
import org.apache.hadoop.yarn.server.nodemanager.api.protocolrecords.ResourceStatusType;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.security.LocalizerSecurityInfo;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.security.LocalizerTokenIdentifier;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.security.LocalizerTokenSecretManager;
import org.apache.hadoop.yarn.util.ConverterUtils;
@ -186,16 +183,30 @@ ExecutorService createDownloadThreadPool() {
}
Callable<Path> download(LocalDirAllocator lda, LocalResource rsrc,
UserGroupInformation ugi) {
return new FSDownload(lfs, ugi, conf, lda, rsrc, new Random());
UserGroupInformation ugi) throws IOException {
Path destPath = lda.getLocalPathForWrite(".", getEstimatedSize(rsrc), conf);
return new FSDownload(lfs, ugi, conf, destPath, rsrc, new Random());
}
static long getEstimatedSize(LocalResource rsrc) {
if (rsrc.getSize() < 0) {
return -1;
}
switch (rsrc.getType()) {
case ARCHIVE:
return 5 * rsrc.getSize();
case FILE:
default:
return rsrc.getSize();
}
}
void sleep(int duration) throws InterruptedException {
TimeUnit.SECONDS.sleep(duration);
}
private void localizeFiles(LocalizationProtocol nodemanager, ExecutorService exec,
UserGroupInformation ugi) {
private void localizeFiles(LocalizationProtocol nodemanager,
ExecutorService exec, UserGroupInformation ugi) throws IOException {
while (true) {
try {
LocalizerStatus status = createStatus();

View File

@ -57,7 +57,6 @@
import java.io.IOException;
import java.net.InetSocketAddress;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
@ -68,7 +67,6 @@
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
import org.apache.hadoop.fs.FileContext;
import org.apache.hadoop.fs.LocalDirAllocator;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.net.NetUtils;
@ -81,6 +79,7 @@
import org.apache.hadoop.yarn.ipc.YarnRPC;
import org.apache.hadoop.yarn.server.nodemanager.ContainerExecutor;
import org.apache.hadoop.yarn.server.nodemanager.DeletionService;
import org.apache.hadoop.yarn.server.nodemanager.LocalDirsHandlerService;
import org.apache.hadoop.yarn.server.nodemanager.api.LocalizationProtocol;
import org.apache.hadoop.yarn.server.nodemanager.api.protocolrecords.LocalResourceStatus;
import org.apache.hadoop.yarn.server.nodemanager.api.protocolrecords.LocalizerAction;
@ -125,19 +124,18 @@ public class ResourceLocalizationService extends CompositeService
private InetSocketAddress localizationServerAddress;
private long cacheTargetSize;
private long cacheCleanupPeriod;
private List<Path> logDirs;
private List<Path> localDirs;
private List<Path> sysDirs;
private final ContainerExecutor exec;
protected final Dispatcher dispatcher;
private final DeletionService delService;
private LocalizerTracker localizerTracker;
private RecordFactory recordFactory;
private final LocalDirAllocator localDirsSelector;
private final ScheduledExecutorService cacheCleanup;
private final LocalResourcesTracker publicRsrc;
private LocalDirsHandlerService dirsHandler;
/**
* Map of LocalResourceTrackers keyed by username, for private
* resources.
@ -153,12 +151,15 @@ public class ResourceLocalizationService extends CompositeService
new ConcurrentHashMap<String,LocalResourcesTracker>();
public ResourceLocalizationService(Dispatcher dispatcher,
ContainerExecutor exec, DeletionService delService) {
ContainerExecutor exec, DeletionService delService,
LocalDirsHandlerService dirsHandler) {
super(ResourceLocalizationService.class.getName());
this.exec = exec;
this.dispatcher = dispatcher;
this.delService = delService;
this.localDirsSelector = new LocalDirAllocator(YarnConfiguration.NM_LOCAL_DIRS);
this.dirsHandler = dirsHandler;
this.publicRsrc = new LocalResourcesTrackerImpl(null, dispatcher);
this.cacheCleanup = new ScheduledThreadPoolExecutor(1,
new ThreadFactoryBuilder()
@ -177,41 +178,31 @@ FileContext getLocalFileContext(Configuration conf) {
@Override
public void init(Configuration conf) {
this.recordFactory = RecordFactoryProvider.getRecordFactory(conf);
try {
// TODO queue deletions here, rather than NM init?
FileContext lfs = getLocalFileContext(conf);
String[] sLocalDirs =
conf.getStrings(YarnConfiguration.NM_LOCAL_DIRS, YarnConfiguration.DEFAULT_NM_LOCAL_DIRS);
localDirs = new ArrayList<Path>(sLocalDirs.length);
logDirs = new ArrayList<Path>(sLocalDirs.length);
sysDirs = new ArrayList<Path>(sLocalDirs.length);
for (String sLocaldir : sLocalDirs) {
Path localdir = new Path(sLocaldir);
localDirs.add(localdir);
List<String> localDirs = dirsHandler.getLocalDirs();
for (String localDir : localDirs) {
// $local/usercache
Path userdir = new Path(localdir, ContainerLocalizer.USERCACHE);
lfs.mkdir(userdir, null, true);
Path userDir = new Path(localDir, ContainerLocalizer.USERCACHE);
lfs.mkdir(userDir, null, true);
// $local/filecache
Path filedir = new Path(localdir, ContainerLocalizer.FILECACHE);
lfs.mkdir(filedir, null, true);
Path fileDir = new Path(localDir, ContainerLocalizer.FILECACHE);
lfs.mkdir(fileDir, null, true);
// $local/nmPrivate
Path sysdir = new Path(localdir, NM_PRIVATE_DIR);
lfs.mkdir(sysdir, NM_PRIVATE_PERM, true);
sysDirs.add(sysdir);
Path sysDir = new Path(localDir, NM_PRIVATE_DIR);
lfs.mkdir(sysDir, NM_PRIVATE_PERM, true);
}
String[] sLogdirs = conf.getStrings(YarnConfiguration.NM_LOG_DIRS, YarnConfiguration.DEFAULT_NM_LOG_DIRS);
for (String sLogdir : sLogdirs) {
Path logdir = new Path(sLogdir);
logDirs.add(logdir);
lfs.mkdir(logdir, null, true);
List<String> logDirs = dirsHandler.getLogDirs();
for (String logDir : logDirs) {
lfs.mkdir(new Path(logDir), null, true);
}
} catch (IOException e) {
throw new YarnException("Failed to initialize LocalizationService", e);
}
localDirs = Collections.unmodifiableList(localDirs);
logDirs = Collections.unmodifiableList(logDirs);
sysDirs = Collections.unmodifiableList(sysDirs);
cacheTargetSize =
conf.getLong(YarnConfiguration.NM_LOCALIZER_CACHE_TARGET_SIZE_MB, YarnConfiguration.DEFAULT_NM_LOCALIZER_CACHE_TARGET_SIZE_MB) << 20;
cacheCleanupPeriod =
@ -391,7 +382,7 @@ private void handleCleanupContainerResources(
String containerIDStr = c.toString();
String appIDStr = ConverterUtils.toString(
c.getContainerID().getApplicationAttemptId().getApplicationId());
for (Path localDir : localDirs) {
for (String localDir : dirsHandler.getLocalDirs()) {
// Delete the user-owned container-dir
Path usersdir = new Path(localDir, ContainerLocalizer.USERCACHE);
@ -428,7 +419,7 @@ private void handleDestroyApplicationResources(Application application) {
// Delete the application directories
userName = application.getUser();
appIDStr = application.toString();
for (Path localDir : localDirs) {
for (String localDir : dirsHandler.getLocalDirs()) {
// Delete the user-owned app-dir
Path usersdir = new Path(localDir, ContainerLocalizer.USERCACHE);
@ -574,12 +565,9 @@ private static ExecutorService createLocalizerExecutor(Configuration conf) {
class PublicLocalizer extends Thread {
static final String PUBCACHE_CTXT = "public.cache.dirs";
final FileContext lfs;
final Configuration conf;
final ExecutorService threadPool;
final LocalDirAllocator publicDirs;
final CompletionService<Path> queue;
final Map<Future<Path>,LocalizerResourceRequestEvent> pending;
// TODO hack to work around broken signaling
@ -601,13 +589,23 @@ class PublicLocalizer extends Thread {
this.conf = conf;
this.pending = pending;
this.attempts = attempts;
String[] publicFilecache = new String[localDirs.size()];
for (int i = 0, n = localDirs.size(); i < n; ++i) {
publicFilecache[i] =
new Path(localDirs.get(i), ContainerLocalizer.FILECACHE).toString();
}
conf.setStrings(PUBCACHE_CTXT, publicFilecache);
this.publicDirs = new LocalDirAllocator(PUBCACHE_CTXT);
// List<String> localDirs = dirsHandler.getLocalDirs();
// String[] publicFilecache = new String[localDirs.size()];
// for (int i = 0, n = localDirs.size(); i < n; ++i) {
// publicFilecache[i] =
// new Path(localDirs.get(i), ContainerLocalizer.FILECACHE).toString();
// }
// conf.setStrings(PUBCACHE_CTXT, publicFilecache);
// this.publicDirDestPath = new LocalDirAllocator(PUBCACHE_CTXT).getLocalPathForWrite(pathStr, conf);
// List<String> localDirs = dirsHandler.getLocalDirs();
// String[] publicFilecache = new String[localDirs.size()];
// int i = 0;
// for (String localDir : localDirs) {
// publicFilecache[i++] =
// new Path(localDir, ContainerLocalizer.FILECACHE).toString();
// }
this.threadPool = threadPool;
this.queue = new ExecutorCompletionService<Path>(threadPool);
}
@ -619,11 +617,19 @@ public void addResource(LocalizerResourceRequestEvent request) {
synchronized (attempts) {
List<LocalizerResourceRequestEvent> sigh = attempts.get(key);
if (null == sigh) {
pending.put(queue.submit(new FSDownload(
lfs, null, conf, publicDirs,
request.getResource().getRequest(), new Random())),
request);
attempts.put(key, new LinkedList<LocalizerResourceRequestEvent>());
LocalResource resource = request.getResource().getRequest();
try {
Path publicDirDestPath = dirsHandler.getLocalPathForWrite(
"." + Path.SEPARATOR + ContainerLocalizer.FILECACHE,
ContainerLocalizer.getEstimatedSize(resource), true);
pending.put(queue.submit(new FSDownload(
lfs, null, conf, publicDirDestPath, resource, new Random())),
request);
attempts.put(key, new LinkedList<LocalizerResourceRequestEvent>());
} catch (IOException e) {
LOG.error("Local path for public localization is not found. "
+ " May be disks failed.", e);
}
} else {
sigh.add(request);
}
@ -844,24 +850,30 @@ LocalizerHeartbeatResponse update(
public void run() {
Path nmPrivateCTokensPath = null;
try {
// Use LocalDirAllocator to get nmPrivateDir
// Get nmPrivateDir
nmPrivateCTokensPath =
localDirsSelector.getLocalPathForWrite(
NM_PRIVATE_DIR
+ Path.SEPARATOR
dirsHandler.getLocalPathForWrite(
NM_PRIVATE_DIR + Path.SEPARATOR
+ String.format(ContainerLocalizer.TOKEN_FILE_NAME_FMT,
localizerId), getConfig());
localizerId));
// 0) init queue, etc.
// 1) write credentials to private dir
writeCredentials(nmPrivateCTokensPath);
// 2) exec initApplication and wait
exec.startLocalizer(nmPrivateCTokensPath, localizationServerAddress,
context.getUser(),
ConverterUtils.toString(
context.getContainerId().
getApplicationAttemptId().getApplicationId()),
localizerId, localDirs);
List<String> localDirs = dirsHandler.getLocalDirs();
List<String> logDirs = dirsHandler.getLogDirs();
if (dirsHandler.areDisksHealthy()) {
exec.startLocalizer(nmPrivateCTokensPath, localizationServerAddress,
context.getUser(),
ConverterUtils.toString(
context.getContainerId().
getApplicationAttemptId().getApplicationId()),
localizerId, localDirs, logDirs);
} else {
throw new IOException("All disks failed. "
+ dirsHandler.getDisksHealthReport());
}
// TODO handle ExitCodeException separately?
} catch (Exception e) {
LOG.info("Localizer failed", e);

View File

@ -20,6 +20,7 @@
import java.io.IOException;
import java.security.PrivilegedExceptionAction;
import java.util.List;
import java.util.Map;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
@ -31,6 +32,7 @@
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.util.StringUtils;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.ApplicationAccessType;
import org.apache.hadoop.yarn.api.records.ContainerId;
@ -40,10 +42,12 @@
import org.apache.hadoop.yarn.logaggregation.AggregatedLogFormat.LogValue;
import org.apache.hadoop.yarn.logaggregation.AggregatedLogFormat.LogWriter;
import org.apache.hadoop.yarn.server.nodemanager.DeletionService;
import org.apache.hadoop.yarn.server.nodemanager.LocalDirsHandlerService;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.ApplicationEvent;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.ApplicationEventType;
import org.apache.hadoop.yarn.util.ConverterUtils;
public class AppLogAggregatorImpl implements AppLogAggregator {
private static final Log LOG = LogFactory
@ -51,6 +55,7 @@ public class AppLogAggregatorImpl implements AppLogAggregator {
private static final int THREAD_SLEEP_TIME = 1000;
private static final String TMP_FILE_SUFFIX = ".tmp";
private final LocalDirsHandlerService dirsHandler;
private final Dispatcher dispatcher;
private final ApplicationId appId;
private final String applicationId;
@ -58,7 +63,6 @@ public class AppLogAggregatorImpl implements AppLogAggregator {
private final Configuration conf;
private final DeletionService delService;
private final UserGroupInformation userUgi;
private final String[] rootLogDirs;
private final Path remoteNodeLogFileForApp;
private final Path remoteNodeTmpLogFileForApp;
private final ContainerLogsRetentionPolicy retentionPolicy;
@ -72,7 +76,7 @@ public class AppLogAggregatorImpl implements AppLogAggregator {
public AppLogAggregatorImpl(Dispatcher dispatcher,
DeletionService deletionService, Configuration conf, ApplicationId appId,
UserGroupInformation userUgi, String[] localRootLogDirs,
UserGroupInformation userUgi, LocalDirsHandlerService dirsHandler,
Path remoteNodeLogFileForApp,
ContainerLogsRetentionPolicy retentionPolicy,
Map<ApplicationAccessType, String> appAcls) {
@ -82,7 +86,7 @@ public AppLogAggregatorImpl(Dispatcher dispatcher,
this.appId = appId;
this.applicationId = ConverterUtils.toString(appId);
this.userUgi = userUgi;
this.rootLogDirs = localRootLogDirs;
this.dirsHandler = dirsHandler;
this.remoteNodeLogFileForApp = remoteNodeLogFileForApp;
this.remoteNodeTmpLogFileForApp = getRemoteNodeTmpLogFileForApp();
this.retentionPolicy = retentionPolicy;
@ -115,9 +119,11 @@ private void uploadLogsForContainer(ContainerId containerId) {
}
}
LOG.info("Uploading logs for container " + containerId);
LOG.info("Uploading logs for container " + containerId
+ ". Current good log dirs are "
+ StringUtils.join(",", dirsHandler.getLogDirs()));
LogKey logKey = new LogKey(containerId);
LogValue logValue = new LogValue(this.rootLogDirs, containerId);
LogValue logValue = new LogValue(dirsHandler.getLogDirs(), containerId);
try {
this.writer.append(logKey, logValue);
} catch (IOException e) {
@ -150,9 +156,10 @@ public void run() {
}
// Remove the local app-log-dirs
Path[] localAppLogDirs = new Path[this.rootLogDirs.length];
List<String> rootLogDirs = dirsHandler.getLogDirs();
Path[] localAppLogDirs = new Path[rootLogDirs.size()];
int index = 0;
for (String rootLogDir : this.rootLogDirs) {
for (String rootLogDir : rootLogDirs) {
localAppLogDirs[index] = new Path(rootLogDir, this.applicationId);
index++;
}

View File

@ -47,6 +47,7 @@
import org.apache.hadoop.yarn.logaggregation.LogAggregationUtils;
import org.apache.hadoop.yarn.server.nodemanager.Context;
import org.apache.hadoop.yarn.server.nodemanager.DeletionService;
import org.apache.hadoop.yarn.server.nodemanager.LocalDirsHandlerService;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.loghandler.LogHandler;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.loghandler.event.LogHandlerAppFinishedEvent;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.loghandler.event.LogHandlerAppStartedEvent;
@ -85,7 +86,7 @@ public class LogAggregationService extends AbstractService implements
private final DeletionService deletionService;
private final Dispatcher dispatcher;
private String[] localRootLogDirs;
private LocalDirsHandlerService dirsHandler;
Path remoteRootLogDir;
String remoteRootLogDirSuffix;
private NodeId nodeId;
@ -95,11 +96,12 @@ public class LogAggregationService extends AbstractService implements
private final ExecutorService threadPool;
public LogAggregationService(Dispatcher dispatcher, Context context,
DeletionService deletionService) {
DeletionService deletionService, LocalDirsHandlerService dirsHandler) {
super(LogAggregationService.class.getName());
this.dispatcher = dispatcher;
this.context = context;
this.deletionService = deletionService;
this.dirsHandler = dirsHandler;
this.appLogAggregators =
new ConcurrentHashMap<ApplicationId, AppLogAggregator>();
this.threadPool = Executors.newCachedThreadPool(
@ -109,9 +111,6 @@ public LogAggregationService(Dispatcher dispatcher, Context context,
}
public synchronized void init(Configuration conf) {
this.localRootLogDirs =
conf.getStrings(YarnConfiguration.NM_LOG_DIRS,
YarnConfiguration.DEFAULT_NM_LOG_DIRS);
this.remoteRootLogDir =
new Path(conf.get(YarnConfiguration.NM_REMOTE_APP_LOG_DIR,
YarnConfiguration.DEFAULT_NM_REMOTE_APP_LOG_DIR));
@ -291,9 +290,10 @@ private void initApp(final ApplicationId appId, String user,
// New application
AppLogAggregator appLogAggregator =
new AppLogAggregatorImpl(this.dispatcher, this.deletionService, getConfig(), appId,
userUgi, this.localRootLogDirs,
getRemoteNodeLogFileForApp(appId, user), logRetentionPolicy, appAcls);
new AppLogAggregatorImpl(this.dispatcher, this.deletionService,
getConfig(), appId, userUgi, dirsHandler,
getRemoteNodeLogFileForApp(appId, user), logRetentionPolicy,
appAcls);
if (this.appLogAggregators.putIfAbsent(appId, appLogAggregator) != null) {
throw new YarnException("Duplicate initApp for " + appId);
}

View File

@ -17,6 +17,7 @@
*/
package org.apache.hadoop.yarn.server.nodemanager.containermanager.loghandler;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ScheduledThreadPoolExecutor;
@ -31,6 +32,7 @@
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.event.Dispatcher;
import org.apache.hadoop.yarn.server.nodemanager.DeletionService;
import org.apache.hadoop.yarn.server.nodemanager.LocalDirsHandlerService;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.ApplicationEvent;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.ApplicationEventType;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.loghandler.event.LogHandlerAppFinishedEvent;
@ -53,15 +55,16 @@ public class NonAggregatingLogHandler extends AbstractService implements
private final DeletionService delService;
private final Map<ApplicationId, String> appOwners;
private String[] rootLogDirs;
private final LocalDirsHandlerService dirsHandler;
private long deleteDelaySeconds;
private ScheduledThreadPoolExecutor sched;
public NonAggregatingLogHandler(Dispatcher dispatcher,
DeletionService delService) {
DeletionService delService, LocalDirsHandlerService dirsHandler) {
super(NonAggregatingLogHandler.class.getName());
this.dispatcher = dispatcher;
this.delService = delService;
this.dirsHandler = dirsHandler;
this.appOwners = new ConcurrentHashMap<ApplicationId, String>();
}
@ -70,9 +73,6 @@ public void init(Configuration conf) {
// Default 3 hours.
this.deleteDelaySeconds =
conf.getLong(YarnConfiguration.NM_LOG_RETAIN_SECONDS, 3 * 60 * 60);
this.rootLogDirs =
conf.getStrings(YarnConfiguration.NM_LOG_DIRS,
YarnConfiguration.DEFAULT_NM_LOG_DIRS);
sched = createScheduledThreadPoolExecutor(conf);
super.init(conf);
}
@ -145,10 +145,11 @@ public LogDeleterRunnable(String user, ApplicationId applicationId) {
@Override
@SuppressWarnings("unchecked")
public void run() {
Path[] localAppLogDirs =
new Path[NonAggregatingLogHandler.this.rootLogDirs.length];
List<String> rootLogDirs =
NonAggregatingLogHandler.this.dirsHandler.getLogDirs();
Path[] localAppLogDirs = new Path[rootLogDirs.size()];
int index = 0;
for (String rootLogDir : NonAggregatingLogHandler.this.rootLogDirs) {
for (String rootLogDir : rootLogDirs) {
localAppLogDirs[index] = new Path(rootLogDir, applicationId.toString());
index++;
}

View File

@ -34,15 +34,14 @@
import java.util.List;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.LocalDirAllocator;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.util.StringUtils;
import org.apache.hadoop.yarn.api.records.ApplicationAccessType;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.server.nodemanager.Context;
import org.apache.hadoop.yarn.server.nodemanager.LocalDirsHandlerService;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.Application;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerState;
@ -87,17 +86,18 @@ protected Class<? extends SubView> content() {
public static class ContainersLogsBlock extends HtmlBlock implements
YarnWebParams {
private final Configuration conf;
private final LocalDirAllocator logsSelector;
private final Context nmContext;
private final ApplicationACLsManager aclsManager;
private final LocalDirsHandlerService dirsHandler;
@Inject
public ContainersLogsBlock(Configuration conf, Context context,
ApplicationACLsManager aclsManager) {
ApplicationACLsManager aclsManager,
LocalDirsHandlerService dirsHandler) {
this.conf = conf;
this.logsSelector = new LocalDirAllocator(YarnConfiguration.NM_LOG_DIRS);
this.nmContext = context;
this.aclsManager = aclsManager;
this.dirsHandler = dirsHandler;
}
@Override
@ -198,11 +198,10 @@ private void printLogs(Block html, ContainerId containerId,
File logFile = null;
try {
logFile =
new File(this.logsSelector
.getLocalPathToRead(
ContainerLaunch.getRelativeContainerLogDir(
applicationId.toString(), containerId.toString())
+ Path.SEPARATOR + $(CONTAINER_LOG_TYPE), this.conf)
new File(this.dirsHandler.getLogPathToRead(
ContainerLaunch.getRelativeContainerLogDir(
applicationId.toString(), containerId.toString())
+ Path.SEPARATOR + $(CONTAINER_LOG_TYPE))
.toUri().getPath());
} catch (Exception e) {
html.h1("Cannot find this log on the local disk.");
@ -272,8 +271,8 @@ private void printLogs(Block html, ContainerId containerId,
}
} else {
// Just print out the log-types
List<File> containerLogsDirs =
getContainerLogDirs(this.conf, containerId);
List<File> containerLogsDirs = getContainerLogDirs(containerId,
dirsHandler);
boolean foundLogFile = false;
for (File containerLogsDir : containerLogsDirs) {
for (File logFile : containerLogsDir.listFiles()) {
@ -293,11 +292,10 @@ private void printLogs(Block html, ContainerId containerId,
return;
}
static List<File>
getContainerLogDirs(Configuration conf, ContainerId containerId) {
String[] logDirs = conf.getStrings(YarnConfiguration.NM_LOG_DIRS,
YarnConfiguration.DEFAULT_NM_LOG_DIRS);
List<File> containerLogDirs = new ArrayList<File>(logDirs.length);
static List<File> getContainerLogDirs(ContainerId containerId,
LocalDirsHandlerService dirsHandler) {
List<String> logDirs = dirsHandler.getLogDirs();
List<File> containerLogDirs = new ArrayList<File>(logDirs.size());
for (String logDir : logDirs) {
String appIdStr =
ConverterUtils.toString(

View File

@ -26,6 +26,7 @@
import org.apache.hadoop.yarn.YarnException;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.server.nodemanager.Context;
import org.apache.hadoop.yarn.server.nodemanager.LocalDirsHandlerService;
import org.apache.hadoop.yarn.server.nodemanager.ResourceView;
import org.apache.hadoop.yarn.server.security.ApplicationACLsManager;
import org.apache.hadoop.yarn.service.AbstractService;
@ -42,10 +43,11 @@ public class WebServer extends AbstractService {
private WebApp webApp;
public WebServer(Context nmContext, ResourceView resourceView,
ApplicationACLsManager aclsManager) {
ApplicationACLsManager aclsManager,
LocalDirsHandlerService dirsHandler) {
super(WebServer.class.getName());
this.nmContext = nmContext;
this.nmWebApp = new NMWebApp(resourceView, aclsManager);
this.nmWebApp = new NMWebApp(resourceView, aclsManager, dirsHandler);
}
@Override
@ -81,17 +83,21 @@ public static class NMWebApp extends WebApp implements YarnWebParams {
private final ResourceView resourceView;
private final ApplicationACLsManager aclsManager;
private final LocalDirsHandlerService dirsHandler;
public NMWebApp(ResourceView resourceView,
ApplicationACLsManager aclsManager) {
ApplicationACLsManager aclsManager,
LocalDirsHandlerService dirsHandler) {
this.resourceView = resourceView;
this.aclsManager = aclsManager;
this.dirsHandler = dirsHandler;
}
@Override
public void setup() {
bind(ResourceView.class).toInstance(this.resourceView);
bind(ApplicationACLsManager.class).toInstance(this.aclsManager);
bind(LocalDirsHandlerService.class).toInstance(dirsHandler);
route("/", NMController.class, "info");
route("/node", NMController.class, "node");
route("/allApplications", NMController.class, "allApplications");

View File

@ -261,8 +261,15 @@ char * get_value(const char* key) {
* Value delimiter is assumed to be a comma.
*/
char ** get_values(const char * key) {
char ** toPass = NULL;
char *value = get_value(key);
return extract_values(value);
}
/**
* Extracts array of values from the comma separated list of values.
*/
char ** extract_values(char *value) {
char ** toPass = NULL;
char *tempTok = NULL;
char *tempstr = NULL;
int size = 0;
@ -276,8 +283,7 @@ char ** get_values(const char * key) {
toPass[size++] = tempTok;
if(size == toPassSize) {
toPassSize += MAX_SIZE;
toPass = (char **) realloc(toPass,(sizeof(char *) *
(MAX_SIZE * toPassSize)));
toPass = (char **) realloc(toPass,(sizeof(char *) * toPassSize));
}
tempTok = strtok_r(NULL, ",", &tempstr);
}

View File

@ -34,6 +34,9 @@ char *get_value(const char* key);
//comma seperated strings.
char ** get_values(const char* key);
// Extracts array of values from the comma separated list of values.
char ** extract_values(char *value);
// free the memory returned by get_values
void free_values(char** values);

View File

@ -357,7 +357,7 @@ int mkdirs(const char* path, mode_t perm) {
* It creates the container work and log directories.
*/
static int create_container_directories(const char* user, const char *app_id,
const char *container_id) {
const char *container_id, char* const* local_dir, char* const* log_dir) {
// create dirs as 0750
const mode_t perms = S_IRWXU | S_IRGRP | S_IXGRP;
if (app_id == NULL || container_id == NULL || user == NULL) {
@ -367,20 +367,11 @@ static int create_container_directories(const char* user, const char *app_id,
}
int result = -1;
char **local_dir = get_values(NM_SYS_DIR_KEY);
if (local_dir == NULL) {
fprintf(LOGFILE, "%s is not configured.\n", NM_SYS_DIR_KEY);
return -1;
}
char **local_dir_ptr;
char* const* local_dir_ptr;
for(local_dir_ptr = local_dir; *local_dir_ptr != NULL; ++local_dir_ptr) {
char *container_dir = get_container_work_directory(*local_dir_ptr, user, app_id,
container_id);
if (container_dir == NULL) {
free_values(local_dir);
return -1;
}
if (mkdirs(container_dir, perms) == 0) {
@ -390,7 +381,6 @@ static int create_container_directories(const char* user, const char *app_id,
free(container_dir);
}
free_values(local_dir);
if (result != 0) {
return result;
}
@ -404,19 +394,11 @@ static int create_container_directories(const char* user, const char *app_id,
} else {
sprintf(combined_name, "%s/%s", app_id, container_id);
char **log_dir = get_values(NM_LOG_DIR_KEY);
if (log_dir == NULL) {
free(combined_name);
fprintf(LOGFILE, "%s is not configured.\n", NM_LOG_DIR_KEY);
return -1;
}
char **log_dir_ptr;
char* const* log_dir_ptr;
for(log_dir_ptr = log_dir; *log_dir_ptr != NULL; ++log_dir_ptr) {
char *container_log_dir = get_app_log_directory(*log_dir_ptr, combined_name);
if (container_log_dir == NULL) {
free(combined_name);
free_values(log_dir);
return -1;
} else if (mkdirs(container_log_dir, perms) != 0) {
free(container_log_dir);
@ -426,7 +408,6 @@ static int create_container_directories(const char* user, const char *app_id,
}
}
free(combined_name);
free_values(log_dir);
}
return result;
}
@ -660,17 +641,12 @@ static int copy_file(int input, const char* in_filename,
/**
* Function to initialize the user directories of a user.
*/
int initialize_user(const char *user) {
char **local_dir = get_values(NM_SYS_DIR_KEY);
if (local_dir == NULL) {
fprintf(LOGFILE, "%s is not configured.\n", NM_SYS_DIR_KEY);
return INVALID_NM_ROOT_DIRS;
}
int initialize_user(const char *user, char* const* local_dirs) {
char *user_dir;
char **local_dir_ptr = local_dir;
char* const* local_dir_ptr;
int failed = 0;
for(local_dir_ptr = local_dir; *local_dir_ptr != 0; ++local_dir_ptr) {
for(local_dir_ptr = local_dirs; *local_dir_ptr != 0; ++local_dir_ptr) {
user_dir = get_user_directory(*local_dir_ptr, user);
if (user_dir == NULL) {
fprintf(LOGFILE, "Couldn't get userdir directory for %s.\n", user);
@ -682,32 +658,29 @@ int initialize_user(const char *user) {
}
free(user_dir);
}
free_values(local_dir);
return failed ? INITIALIZE_USER_FAILED : 0;
}
/**
* Function to prepare the application directories for the container.
*/
int initialize_app(const char *user, const char *app_id,
const char* nmPrivate_credentials_file, char* const* args) {
int initialize_app(const char *user, const char *app_id,
const char* nmPrivate_credentials_file,
char* const* local_dirs, char* const* log_roots,
char* const* args) {
if (app_id == NULL || user == NULL) {
fprintf(LOGFILE, "Either app_id is null or the user passed is null.\n");
return INVALID_ARGUMENT_NUMBER;
}
// create the user directory on all disks
int result = initialize_user(user);
int result = initialize_user(user, local_dirs);
if (result != 0) {
return result;
}
////////////// create the log directories for the app on all disks
char **log_roots = get_values(NM_LOG_DIR_KEY);
if (log_roots == NULL) {
return INVALID_CONFIG_FILE;
}
char **log_root;
char* const* log_root;
char *any_one_app_log_dir = NULL;
for(log_root=log_roots; *log_root != NULL; ++log_root) {
char *app_log_dir = get_app_log_directory(*log_root, app_id);
@ -722,7 +695,7 @@ int initialize_app(const char *user, const char *app_id,
free(app_log_dir);
}
}
free_values(log_roots);
if (any_one_app_log_dir == NULL) {
fprintf(LOGFILE, "Did not create any app-log directories\n");
return -1;
@ -743,15 +716,9 @@ int initialize_app(const char *user, const char *app_id,
// 750
mode_t permissions = S_IRWXU | S_IRGRP | S_IXGRP;
char **nm_roots = get_values(NM_SYS_DIR_KEY);
if (nm_roots == NULL) {
return INVALID_CONFIG_FILE;
}
char **nm_root;
char* const* nm_root;
char *primary_app_dir = NULL;
for(nm_root=nm_roots; *nm_root != NULL; ++nm_root) {
for(nm_root=local_dirs; *nm_root != NULL; ++nm_root) {
char *app_dir = get_app_directory(*nm_root, user, app_id);
if (app_dir == NULL) {
// try the next one
@ -763,7 +730,7 @@ int initialize_app(const char *user, const char *app_id,
free(app_dir);
}
}
free_values(nm_roots);
if (primary_app_dir == NULL) {
fprintf(LOGFILE, "Did not create any app directories\n");
return -1;
@ -805,9 +772,10 @@ int initialize_app(const char *user, const char *app_id,
}
int launch_container_as_user(const char *user, const char *app_id,
const char *container_id, const char *work_dir,
const char *script_name, const char *cred_file,
const char* pid_file) {
const char *container_id, const char *work_dir,
const char *script_name, const char *cred_file,
const char* pid_file, char* const* local_dirs,
char* const* log_dirs) {
int exit_code = -1;
char *script_file_dest = NULL;
char *cred_file_dest = NULL;
@ -854,7 +822,8 @@ int launch_container_as_user(const char *user, const char *app_id,
goto cleanup;
}
if (create_container_directories(user, app_id, container_id) != 0) {
if (create_container_directories(user, app_id, container_id, local_dirs,
log_dirs) != 0) {
fprintf(LOGFILE, "Could not create container dirs");
goto cleanup;
}

View File

@ -61,8 +61,6 @@ enum errorcodes {
#define NM_APP_DIR_PATTERN USER_DIR_PATTERN "/appcache/%s"
#define CONTAINER_DIR_PATTERN NM_APP_DIR_PATTERN "/%s"
#define CONTAINER_SCRIPT "launch_container.sh"
#define NM_SYS_DIR_KEY "yarn.nodemanager.local-dirs"
#define NM_LOG_DIR_KEY "yarn.nodemanager.log-dirs"
#define CREDENTIALS_FILENAME "container_tokens"
#define MIN_USERID_KEY "min.user.id"
#define BANNED_USERS_KEY "banned.users"
@ -92,12 +90,13 @@ int check_executor_permissions(char *executable_file);
// initialize the application directory
int initialize_app(const char *user, const char *app_id,
const char *credentials, char* const* args);
const char *credentials, char* const* local_dirs,
char* const* log_dirs, char* const* args);
/*
* Function used to launch a container as the provided user. It does the following :
* 1) Creates container work dir and log dir to be accessible by the child
* 2) Copies the script file from the TT to the work directory
* 2) Copies the script file from the NM to the work directory
* 3) Sets up the environment
* 4) Does an execlp on the same in order to replace the current image with
* container image.
@ -109,12 +108,15 @@ int initialize_app(const char *user, const char *app_id,
* @param cred_file the credentials file that needs to be compied to the
* working directory.
* @param pid_file file where pid of process should be written to
* @param local_dirs nodemanager-local-directories to be used
* @param log_dirs nodemanager-log-directories to be used
* @return -1 or errorcode enum value on error (should never return on success).
*/
int launch_container_as_user(const char * user, const char *app_id,
const char *container_id, const char *work_dir,
const char *script_name, const char *cred_file,
const char *pid_file);
const char *pid_file, char* const* local_dirs,
char* const* log_dirs);
/**
* Function used to signal a container launched by the user.
@ -181,7 +183,7 @@ int mkdirs(const char* path, mode_t perm);
/**
* Function to initialize the user directories of a user.
*/
int initialize_user(const char *user);
int initialize_user(const char *user, char* const* local_dirs);
/**
* Create a top level directory for the user.

View File

@ -43,10 +43,11 @@ void display_usage(FILE *stream) {
fprintf(stream,
"Usage: container-executor user command command-args\n");
fprintf(stream, "Commands:\n");
fprintf(stream, " initialize container: %2d appid tokens cmd app...\n",
INITIALIZE_CONTAINER);
fprintf(stream, " initialize container: %2d appid tokens " \
"nm-local-dirs nm-log-dirs cmd app...\n", INITIALIZE_CONTAINER);
fprintf(stream,
" launch container: %2d appid containerid workdir container-script tokens pidfile\n",
" launch container: %2d appid containerid workdir "\
"container-script tokens pidfile nm-local-dirs nm-log-dirs\n",
LAUNCH_CONTAINER);
fprintf(stream, " signal container: %2d container-pid signal\n",
SIGNAL_CONTAINER);
@ -96,6 +97,7 @@ int main(int argc, char **argv) {
char *orig_conf_file = STRINGIFY(HADOOP_CONF_DIR) "/" CONF_FILENAME;
char *conf_file = realpath(orig_conf_file, NULL);
char *local_dirs, *log_dirs;
if (conf_file == NULL) {
fprintf(ERRORFILE, "Configuration file %s not found.\n", orig_conf_file);
@ -158,20 +160,23 @@ int main(int argc, char **argv) {
switch (command) {
case INITIALIZE_CONTAINER:
if (argc < 6) {
fprintf(ERRORFILE, "Too few arguments (%d vs 6) for initialize container\n",
if (argc < 8) {
fprintf(ERRORFILE, "Too few arguments (%d vs 8) for initialize container\n",
argc);
fflush(ERRORFILE);
return INVALID_ARGUMENT_NUMBER;
}
app_id = argv[optind++];
cred_file = argv[optind++];
local_dirs = argv[optind++];// good local dirs as a comma separated list
log_dirs = argv[optind++];// good log dirs as a comma separated list
exit_code = initialize_app(user_detail->pw_name, app_id, cred_file,
argv + optind);
extract_values(local_dirs),
extract_values(log_dirs), argv + optind);
break;
case LAUNCH_CONTAINER:
if (argc < 9) {
fprintf(ERRORFILE, "Too few arguments (%d vs 9) for launch container\n",
if (argc != 11) {
fprintf(ERRORFILE, "Too few arguments (%d vs 11) for launch container\n",
argc);
fflush(ERRORFILE);
return INVALID_ARGUMENT_NUMBER;
@ -182,13 +187,17 @@ int main(int argc, char **argv) {
script_file = argv[optind++];
cred_file = argv[optind++];
pid_file = argv[optind++];
exit_code = launch_container_as_user(user_detail->pw_name, app_id, container_id,
current_dir, script_file, cred_file, pid_file);
local_dirs = argv[optind++];// good local dirs as a comma separated list
log_dirs = argv[optind++];// good log dirs as a comma separated list
exit_code = launch_container_as_user(user_detail->pw_name, app_id,
container_id, current_dir, script_file, cred_file,
pid_file, extract_values(local_dirs),
extract_values(log_dirs));
break;
case SIGNAL_CONTAINER:
if (argc < 5) {
fprintf(ERRORFILE, "Too few arguments (%d vs 5) for signal container\n",
argc);
if (argc != 5) {
fprintf(ERRORFILE, "Wrong number of arguments (%d vs 5) for " \
"signal container\n", argc);
fflush(ERRORFILE);
return INVALID_ARGUMENT_NUMBER;
} else {

View File

@ -28,10 +28,17 @@
#include <sys/stat.h>
#include <sys/wait.h>
#define TEST_ROOT "/tmp/test-container-controller"
#define TEST_ROOT "/tmp/test-container-executor"
#define DONT_TOUCH_FILE "dont-touch-me"
#define NM_LOCAL_DIRS TEST_ROOT "/local-1," TEST_ROOT "/local-2," \
TEST_ROOT "/local-3," TEST_ROOT "/local-4," TEST_ROOT "/local-5"
#define NM_LOG_DIRS TEST_ROOT "/logdir_1," TEST_ROOT "/logdir_2," \
TEST_ROOT "/logdir_3," TEST_ROOT "/logdir_4"
#define ARRAY_SIZE 1000
static char* username = NULL;
static char* local_dirs = NULL;
static char* log_dirs = NULL;
/**
* Run the command using the effective user id.
@ -84,40 +91,33 @@ void run(const char *cmd) {
int write_config_file(char *file_name) {
FILE *file;
int i = 0;
file = fopen(file_name, "w");
if (file == NULL) {
printf("Failed to open %s.\n", file_name);
return EXIT_FAILURE;
}
fprintf(file, "yarn.nodemanager.local-dirs=" TEST_ROOT "/local-1");
for(i=2; i < 5; ++i) {
fprintf(file, "," TEST_ROOT "/local-%d", i);
}
fprintf(file, "\n");
fprintf(file, "yarn.nodemanager.log-dirs=" TEST_ROOT "/logs\n");
fprintf(file, "banned.users=bannedUser\n");
fprintf(file, "min.user.id=1000\n");
fclose(file);
return 0;
}
void create_nm_roots() {
char** nm_roots = get_values(NM_SYS_DIR_KEY);
void create_nm_roots(char ** nm_roots) {
char** nm_root;
for(nm_root=nm_roots; *nm_root != NULL; ++nm_root) {
if (mkdir(*nm_root, 0755) != 0) {
printf("FAIL: Can't create directory %s - %s\n", *nm_root,
strerror(errno));
strerror(errno));
exit(1);
}
char buffer[100000];
sprintf(buffer, "%s/usercache", *nm_root);
if (mkdir(buffer, 0755) != 0) {
printf("FAIL: Can't create directory %s - %s\n", buffer,
strerror(errno));
strerror(errno));
exit(1);
}
}
free_values(nm_roots);
}
void test_get_user_directory() {
@ -209,7 +209,7 @@ void test_check_configuration_permissions() {
}
void test_delete_container() {
if (initialize_user(username)) {
if (initialize_user(username, extract_values(local_dirs))) {
printf("FAIL: failed to initialize user %s\n", username);
exit(1);
}
@ -504,7 +504,8 @@ void test_init_app() {
exit(1);
} else if (child == 0) {
char *final_pgm[] = {"touch", "my-touch-file", 0};
if (initialize_app(username, "app_4", TEST_ROOT "/creds.txt", final_pgm) != 0) {
if (initialize_app(username, "app_4", TEST_ROOT "/creds.txt", final_pgm,
extract_values(local_dirs), extract_values(log_dirs)) != 0) {
printf("FAIL: failed in child\n");
exit(42);
}
@ -598,7 +599,8 @@ void test_run_container() {
exit(1);
} else if (child == 0) {
if (launch_container_as_user(username, "app_4", "container_1",
container_dir, script_name, TEST_ROOT "/creds.txt", pid_file) != 0) {
container_dir, script_name, TEST_ROOT "/creds.txt", pid_file,
extract_values(local_dirs), extract_values(log_dirs)) != 0) {
printf("FAIL: failed in child\n");
exit(42);
}
@ -677,7 +679,12 @@ int main(int argc, char **argv) {
}
read_config(TEST_ROOT "/test.cfg");
create_nm_roots();
local_dirs = (char *) malloc (sizeof(char) * ARRAY_SIZE);
strcpy(local_dirs, NM_LOCAL_DIRS);
log_dirs = (char *) malloc (sizeof(char) * ARRAY_SIZE);
strcpy(log_dirs, NM_LOG_DIRS);
create_nm_roots(extract_values(local_dirs));
if (getuid() == 0 && argc == 2) {
username = argv[1];

View File

@ -60,16 +60,18 @@ public DummyContainerManager(Context context, ContainerExecutor exec,
DeletionService deletionContext, NodeStatusUpdater nodeStatusUpdater,
NodeManagerMetrics metrics,
ContainerTokenSecretManager containerTokenSecretManager,
ApplicationACLsManager applicationACLsManager) {
ApplicationACLsManager applicationACLsManager,
LocalDirsHandlerService dirsHandler) {
super(context, exec, deletionContext, nodeStatusUpdater, metrics,
containerTokenSecretManager, applicationACLsManager);
containerTokenSecretManager, applicationACLsManager, dirsHandler);
}
@Override
@SuppressWarnings("unchecked")
protected ResourceLocalizationService createResourceLocalizationService(ContainerExecutor exec,
DeletionService deletionContext) {
return new ResourceLocalizationService(super.dispatcher, exec, deletionContext) {
protected ResourceLocalizationService createResourceLocalizationService(
ContainerExecutor exec, DeletionService deletionContext) {
return new ResourceLocalizationService(super.dispatcher, exec,
deletionContext, super.dirsHandler) {
@Override
public void handle(LocalizationEvent event) {
switch (event.getType()) {
@ -125,7 +127,8 @@ public void handle(LocalizationEvent event) {
@SuppressWarnings("unchecked")
protected ContainersLauncher createContainersLauncher(Context context,
ContainerExecutor exec) {
return new ContainersLauncher(context, super.dispatcher, exec) {
return new ContainersLauncher(context, super.dispatcher, exec,
super.dirsHandler) {
@Override
public void handle(ContainersLauncherEvent event) {
Container container = event.getContainer();
@ -139,7 +142,8 @@ public void handle(ContainersLauncherEvent event) {
case CLEANUP_CONTAINER:
dispatcher.getEventHandler().handle(
new ContainerExitEvent(containerId,
ContainerEventType.CONTAINER_KILLED_ON_REQUEST, 0));
ContainerEventType.CONTAINER_KILLED_ON_REQUEST, 0,
"Container exited with exit code 0."));
break;
}
}

View File

@ -21,7 +21,6 @@
import java.io.File;
import java.io.IOException;
import org.apache.hadoop.NodeHealthCheckerService;
import org.apache.hadoop.fs.FileContext;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.yarn.api.protocolrecords.StartContainerRequest;
@ -80,9 +79,12 @@ public void testSuccessfulContainerLaunch() throws InterruptedException,
ContainerExecutor exec = new DefaultContainerExecutor();
exec.setConf(conf);
DeletionService del = new DeletionService(exec);
Dispatcher dispatcher = new AsyncDispatcher();
NodeHealthCheckerService healthChecker = null;
NodeHealthCheckerService healthChecker = new NodeHealthCheckerService();
healthChecker.init(conf);
LocalDirsHandlerService dirsHandler = healthChecker.getDiskHandler();
NodeManagerMetrics metrics = NodeManagerMetrics.create();
ContainerTokenSecretManager containerTokenSecretManager = new ContainerTokenSecretManager();
NodeStatusUpdater nodeStatusUpdater =
@ -100,7 +102,8 @@ protected void startStatusUpdater() {
DummyContainerManager containerManager = new DummyContainerManager(
context, exec, del, nodeStatusUpdater, metrics,
containerTokenSecretManager, new ApplicationACLsManager(conf));
containerTokenSecretManager, new ApplicationACLsManager(conf),
dirsHandler);
containerManager.init(conf);
containerManager.start();

View File

@ -37,6 +37,7 @@
import org.apache.hadoop.fs.FileContext;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
@ -63,8 +64,6 @@
* config values.
* <br><pre><code>
* > cat /etc/hadoop/container-executor.cfg
* yarn.nodemanager.local-dirs=/tmp/hadoop/nm-local/
* yarn.nodemanager.log-dirs=/tmp/hadoop/nm-log
* yarn.nodemanager.linux-container-executor.group=mapred
* #depending on the user id of the application.submitter option
* min.user.id=1
@ -72,7 +71,7 @@
* > sudo chmod 444 /etc/hadoop/container-executor.cfg
* </code></pre>
*
* <li>iMove the binary and set proper permissions on it. It needs to be owned
* <li>Move the binary and set proper permissions on it. It needs to be owned
* by root, the group needs to be the group configured in container-executor.cfg,
* and it needs the setuid bit set. (The build will also overwrite it so you
* need to move it to a place that you can support it.
@ -98,14 +97,22 @@ public class TestLinuxContainerExecutor {
private LinuxContainerExecutor exec = null;
private String appSubmitter = null;
private LocalDirsHandlerService dirsHandler;
@Before
public void setup() throws Exception {
FileContext.getLocalFSFileContext().mkdir(
new Path(workSpace.getAbsolutePath()), null, true);
FileContext files = FileContext.getLocalFSFileContext();
Path workSpacePath = new Path(workSpace.getAbsolutePath());
files.mkdir(workSpacePath, null, true);
workSpace.setReadable(true, false);
workSpace.setExecutable(true, false);
workSpace.setWritable(true, false);
File localDir = new File(workSpace.getAbsoluteFile(), "localDir");
files.mkdir(new Path(localDir.getAbsolutePath()),
new FsPermission("777"), false);
File logDir = new File(workSpace.getAbsoluteFile(), "logDir");
files.mkdir(new Path(logDir.getAbsolutePath()),
new FsPermission("777"), false);
String exec_path = System.getProperty("container-executor.path");
if(exec_path != null && !exec_path.isEmpty()) {
Configuration conf = new Configuration(false);
@ -114,6 +121,10 @@ public void setup() throws Exception {
conf.set(YarnConfiguration.NM_LINUX_CONTAINER_EXECUTOR_PATH, exec_path);
exec = new LinuxContainerExecutor();
exec.setConf(conf);
conf.set(YarnConfiguration.NM_LOCAL_DIRS, localDir.getAbsolutePath());
conf.set(YarnConfiguration.NM_LOG_DIRS, logDir.getAbsolutePath());
dirsHandler = new LocalDirsHandlerService();
dirsHandler.init(conf);
}
appSubmitter = System.getProperty("application.submitter");
if(appSubmitter == null || appSubmitter.isEmpty()) {
@ -189,7 +200,8 @@ private int runAndBlock(ContainerId cId, String ... cmd) throws IOException {
exec.activateContainer(cId, pidFile);
return exec.launchContainer(container, scriptPath, tokensPath,
appSubmitter, appId, workDir);
appSubmitter, appId, workDir, dirsHandler.getLocalDirs(),
dirsHandler.getLogDirs());
}

View File

@ -35,6 +35,7 @@
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.util.StringUtils;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
@ -51,6 +52,7 @@ public class TestLinuxContainerExecutorWithMocks {
private LinuxContainerExecutor mockExec = null;
private final File mockParamFile = new File("./params.txt");
private LocalDirsHandlerService dirsHandler;
private void deleteMockParamFile() {
if(mockParamFile.exists()) {
@ -80,6 +82,8 @@ public void setup() {
Configuration conf = new Configuration();
conf.set(YarnConfiguration.NM_LINUX_CONTAINER_EXECUTOR_PATH, executorPath);
mockExec = new LinuxContainerExecutor();
dirsHandler = new LocalDirsHandlerService();
dirsHandler.init(conf);
mockExec.setConf(conf);
}
@ -114,10 +118,13 @@ public void testContainerLaunch() throws IOException {
mockExec.activateContainer(cId, pidFile);
int ret = mockExec.launchContainer(container, scriptPath, tokensPath,
appSubmitter, appId, workDir);
appSubmitter, appId, workDir, dirsHandler.getLocalDirs(),
dirsHandler.getLogDirs());
assertEquals(0, ret);
assertEquals(Arrays.asList(appSubmitter, cmd, appId, containerId,
workDir.toString(), "/bin/echo", "/dev/null", pidFile.toString()),
workDir.toString(), "/bin/echo", "/dev/null", pidFile.toString(),
StringUtils.join(",", dirsHandler.getLocalDirs()),
StringUtils.join(",", dirsHandler.getLogDirs())),
readMockParams());
}

View File

@ -16,7 +16,7 @@
* limitations under the License.
*/
package org.apache.hadoop;
package org.apache.hadoop.yarn.server.nodemanager;
import java.io.File;
import java.io.FileOutputStream;
@ -88,24 +88,31 @@ private void writeNodeHealthScriptFile(String scriptStr, boolean setExecutable)
public void testNodeHealthScriptShouldRun() throws IOException {
// Node health script should not start if there is no property called
// node health script path.
Assert.assertFalse("By default Health checker should not have started",
NodeHealthCheckerService.shouldRun(new Configuration()));
Assert.assertFalse("By default Health script should not have started",
NodeHealthScriptRunner.shouldRun(new Configuration()));
Configuration conf = getConfForNodeHealthScript();
// Node health script should not start if the node health script does not
// exists
Assert.assertFalse("Node health script should start", NodeHealthCheckerService
.shouldRun(conf));
Assert.assertFalse("Node health script should start",
NodeHealthScriptRunner.shouldRun(conf));
// Create script path.
conf.writeXml(new FileOutputStream(nodeHealthConfigFile));
conf.addResource(nodeHealthConfigFile.getName());
writeNodeHealthScriptFile("", false);
// Node health script should not start if the node health script is not
// executable.
Assert.assertFalse("Node health script should start", NodeHealthCheckerService
.shouldRun(conf));
Assert.assertFalse("Node health script should start",
NodeHealthScriptRunner.shouldRun(conf));
writeNodeHealthScriptFile("", true);
Assert.assertTrue("Node health script should start", NodeHealthCheckerService
.shouldRun(conf));
Assert.assertTrue("Node health script should start",
NodeHealthScriptRunner.shouldRun(conf));
}
private void setHealthStatus(NodeHealthStatus healthStatus, boolean isHealthy,
String healthReport, long lastHealthReportTime) {
healthStatus.setHealthReport(healthReport);
healthStatus.setIsNodeHealthy(isHealthy);
healthStatus.setLastHealthReportTime(lastHealthReportTime);
}
@Test
@ -120,54 +127,67 @@ public void testNodeHealthScript() throws Exception {
conf.writeXml(new FileOutputStream(nodeHealthConfigFile));
conf.addResource(nodeHealthConfigFile.getName());
NodeHealthCheckerService nodeHealthChecker = new NodeHealthCheckerService(
conf);
TimerTask timer = nodeHealthChecker.getTimer();
writeNodeHealthScriptFile(normalScript, true);
timer.run();
NodeHealthCheckerService nodeHealthChecker = new NodeHealthCheckerService();
nodeHealthChecker.init(conf);
NodeHealthScriptRunner nodeHealthScriptRunner =
nodeHealthChecker.getNodeHealthScriptRunner();
TimerTask timerTask = nodeHealthScriptRunner.getTimerTask();
nodeHealthChecker.setHealthStatus(healthStatus);
timerTask.run();
setHealthStatus(healthStatus, nodeHealthChecker.isHealthy(),
nodeHealthChecker.getHealthReport(),
nodeHealthChecker.getLastHealthReportTime());
LOG.info("Checking initial healthy condition");
// Check proper report conditions.
Assert.assertTrue("Node health status reported unhealthy", healthStatus
.getIsNodeHealthy());
Assert.assertTrue("Node health status reported unhealthy", healthStatus
.getHealthReport().isEmpty());
.getHealthReport().equals(nodeHealthChecker.getHealthReport()));
// write out error file.
// Healthy to unhealthy transition
writeNodeHealthScriptFile(errorScript, true);
// Run timer
timer.run();
timerTask.run();
// update health status
nodeHealthChecker.setHealthStatus(healthStatus);
setHealthStatus(healthStatus, nodeHealthChecker.isHealthy(),
nodeHealthChecker.getHealthReport(),
nodeHealthChecker.getLastHealthReportTime());
LOG.info("Checking Healthy--->Unhealthy");
Assert.assertFalse("Node health status reported healthy", healthStatus
.getIsNodeHealthy());
Assert.assertFalse("Node health status reported healthy", healthStatus
.getHealthReport().isEmpty());
Assert.assertTrue("Node health status reported healthy", healthStatus
.getHealthReport().equals(nodeHealthChecker.getHealthReport()));
// Check unhealthy to healthy transitions.
writeNodeHealthScriptFile(normalScript, true);
timer.run();
nodeHealthChecker.setHealthStatus(healthStatus);
timerTask.run();
setHealthStatus(healthStatus, nodeHealthChecker.isHealthy(),
nodeHealthChecker.getHealthReport(),
nodeHealthChecker.getLastHealthReportTime());
LOG.info("Checking UnHealthy--->healthy");
// Check proper report conditions.
Assert.assertTrue("Node health status reported unhealthy", healthStatus
.getIsNodeHealthy());
Assert.assertTrue("Node health status reported unhealthy", healthStatus
.getHealthReport().isEmpty());
.getHealthReport().equals(nodeHealthChecker.getHealthReport()));
// Healthy to timeout transition.
writeNodeHealthScriptFile(timeOutScript, true);
timer.run();
nodeHealthChecker.setHealthStatus(healthStatus);
timerTask.run();
setHealthStatus(healthStatus, nodeHealthChecker.isHealthy(),
nodeHealthChecker.getHealthReport(),
nodeHealthChecker.getLastHealthReportTime());
LOG.info("Checking Healthy--->timeout");
Assert.assertFalse("Node health status reported healthy even after timeout",
healthStatus.getIsNodeHealthy());
Assert.assertEquals("Node time out message not propogated", healthStatus
.getHealthReport(),
NodeHealthCheckerService.NODE_HEALTH_SCRIPT_TIMED_OUT_MSG);
Assert.assertTrue("Node script time out message not propogated",
healthStatus.getHealthReport().equals(
NodeHealthScriptRunner.NODE_HEALTH_SCRIPT_TIMED_OUT_MSG
+ NodeHealthCheckerService.SEPARATOR
+ nodeHealthChecker.getDiskHandler().getDisksHealthReport()));
}
}

View File

@ -29,7 +29,6 @@
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.NodeHealthCheckerService;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileContext;
import org.apache.hadoop.fs.Path;
@ -440,10 +439,11 @@ protected ContainerManagerImpl createContainerManager(Context context,
ContainerExecutor exec, DeletionService del,
NodeStatusUpdater nodeStatusUpdater,
ContainerTokenSecretManager containerTokenSecretManager,
ApplicationACLsManager aclsManager) {
ApplicationACLsManager aclsManager,
LocalDirsHandlerService diskhandler) {
return new ContainerManagerImpl(context, exec, del,
nodeStatusUpdater, metrics, containerTokenSecretManager,
aclsManager) {
aclsManager, diskhandler) {
@Override
public void start() {
// Simulating failure of starting RPC server

View File

@ -45,7 +45,9 @@
import org.apache.hadoop.yarn.server.nodemanager.Context;
import org.apache.hadoop.yarn.server.nodemanager.DefaultContainerExecutor;
import org.apache.hadoop.yarn.server.nodemanager.DeletionService;
import org.apache.hadoop.yarn.server.nodemanager.LocalDirsHandlerService;
import org.apache.hadoop.yarn.server.nodemanager.LocalRMInterface;
import org.apache.hadoop.yarn.server.nodemanager.NodeHealthCheckerService;
import org.apache.hadoop.yarn.server.nodemanager.NodeManager.NMContext;
import org.apache.hadoop.yarn.server.nodemanager.NodeStatusUpdater;
import org.apache.hadoop.yarn.server.nodemanager.NodeStatusUpdaterImpl;
@ -94,6 +96,8 @@ public BaseContainerManagerTest() throws UnsupportedFileSystemException {
protected ContainerExecutor exec;
protected DeletionService delSrvc;
protected String user = "nobody";
protected NodeHealthCheckerService nodeHealthChecker;
protected LocalDirsHandlerService dirsHandler;
protected NodeStatusUpdater nodeStatusUpdater = new NodeStatusUpdaterImpl(
context, new AsyncDispatcher(), null, metrics, this.containerTokenSecretManager) {
@ -147,9 +151,12 @@ public void delete(String user, Path subDir, Path[] baseDirs) {
delSrvc.init(conf);
exec = createContainerExecutor();
nodeHealthChecker = new NodeHealthCheckerService();
nodeHealthChecker.init(conf);
dirsHandler = nodeHealthChecker.getDiskHandler();
containerManager = new ContainerManagerImpl(context, exec, delSrvc,
nodeStatusUpdater, metrics, this.containerTokenSecretManager,
new ApplicationACLsManager(conf));
new ApplicationACLsManager(conf), dirsHandler);
containerManager.init(conf);
}

View File

@ -383,11 +383,12 @@ public void testLocalFilesCleanup() throws InterruptedException,
// Real del service
delSrvc = new DeletionService(exec);
delSrvc.init(conf);
ContainerTokenSecretManager containerTokenSecretManager = new
ContainerTokenSecretManager();
containerManager = new ContainerManagerImpl(context, exec, delSrvc,
nodeStatusUpdater, metrics, containerTokenSecretManager,
new ApplicationACLsManager(conf));
new ApplicationACLsManager(conf), dirsHandler);
containerManager.init(conf);
containerManager.start();

View File

@ -25,6 +25,7 @@
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
import java.io.IOException;
import java.net.URISyntaxException;
import java.nio.ByteBuffer;
import java.util.AbstractMap.SimpleEntry;
@ -649,7 +650,8 @@ public void containerSuccessful() {
public void containerFailed(int exitCode) {
c.handle(new ContainerExitEvent(cId,
ContainerEventType.CONTAINER_EXITED_WITH_FAILURE, exitCode));
ContainerEventType.CONTAINER_EXITED_WITH_FAILURE, exitCode,
"Container completed with exit code " + exitCode));
drainDispatcherEvents();
}
@ -659,9 +661,10 @@ public void killContainer() {
}
public void containerKilledOnRequest() {
int exitCode = ExitCode.FORCE_KILLED.getExitCode();
c.handle(new ContainerExitEvent(cId,
ContainerEventType.CONTAINER_KILLED_ON_REQUEST, ExitCode.FORCE_KILLED
.getExitCode()));
ContainerEventType.CONTAINER_KILLED_ON_REQUEST, exitCode,
"Container completed with exit code " + exitCode));
drainDispatcherEvents();
}

View File

@ -59,6 +59,8 @@
import org.apache.hadoop.yarn.event.EventHandler;
import org.apache.hadoop.yarn.server.nodemanager.ContainerExecutor;
import org.apache.hadoop.yarn.server.nodemanager.DeletionService;
import org.apache.hadoop.yarn.server.nodemanager.LocalDirsHandlerService;
import org.apache.hadoop.yarn.server.nodemanager.NodeHealthCheckerService;
import org.apache.hadoop.yarn.server.nodemanager.api.protocolrecords.LocalResourceStatus;
import org.apache.hadoop.yarn.server.nodemanager.api.protocolrecords.LocalizerAction;
import org.apache.hadoop.yarn.server.nodemanager.api.protocolrecords.LocalizerHeartbeatResponse;
@ -109,19 +111,23 @@ public void testLocalizationInit() throws Exception {
doNothing().when(spylfs).mkdir(
isA(Path.class), isA(FsPermission.class), anyBoolean());
List<Path> localDirs = new ArrayList<Path>();
String[] sDirs = new String[4];
for (int i = 0; i < 4; ++i) {
localDirs.add(lfs.makeQualified(new Path(basedir, i + "")));
sDirs[i] = localDirs.get(i).toString();
}
conf.setStrings(YarnConfiguration.NM_LOCAL_DIRS, sDirs);
LocalDirsHandlerService diskhandler = new LocalDirsHandlerService();
diskhandler.init(conf);
ResourceLocalizationService locService =
spy(new ResourceLocalizationService(dispatcher, exec, delService));
spy(new ResourceLocalizationService(dispatcher, exec, delService,
diskhandler));
doReturn(lfs)
.when(locService).getLocalFileContext(isA(Configuration.class));
try {
dispatcher.start();
List<Path> localDirs = new ArrayList<Path>();
String[] sDirs = new String[4];
for (int i = 0; i < 4; ++i) {
localDirs.add(lfs.makeQualified(new Path(basedir, i + "")));
sDirs[i] = localDirs.get(i).toString();
}
conf.setStrings(YarnConfiguration.NM_LOCAL_DIRS, sDirs);
// initialize ResourceLocalizationService
locService.init(conf);
@ -176,12 +182,16 @@ public void testResourceRelease() throws Exception {
dispatcher.register(LocalizerEventType.class, localizerBus);
ContainerExecutor exec = mock(ContainerExecutor.class);
LocalDirsHandlerService dirsHandler = new LocalDirsHandlerService();
dirsHandler.init(conf);
DeletionService delService = new DeletionService(exec);
delService.init(null);
delService.start();
ResourceLocalizationService rawService =
new ResourceLocalizationService(dispatcher, exec, delService);
new ResourceLocalizationService(dispatcher, exec, delService,
dirsHandler);
ResourceLocalizationService spyService = spy(rawService);
doReturn(ignore).when(spyService).createServer();
doReturn(mockLocallilzerTracker).when(spyService).createLocalizerTracker(
@ -356,13 +366,17 @@ public void testLocalizationHeartbeat() throws Exception {
dispatcher.register(ContainerEventType.class, containerBus);
ContainerExecutor exec = mock(ContainerExecutor.class);
LocalDirsHandlerService dirsHandler = new LocalDirsHandlerService();
dirsHandler.init(conf);
DeletionService delServiceReal = new DeletionService(exec);
DeletionService delService = spy(delServiceReal);
delService.init(null);
delService.start();
ResourceLocalizationService rawService =
new ResourceLocalizationService(dispatcher, exec, delService);
new ResourceLocalizationService(dispatcher, exec, delService,
dirsHandler);
ResourceLocalizationService spyService = spy(rawService);
doReturn(ignore).when(spyService).createServer();
doReturn(lfs).when(spyService).getLocalFileContext(isA(Configuration.class));
@ -414,8 +428,9 @@ public boolean matches(Object o) {
String appStr = ConverterUtils.toString(appId);
String ctnrStr = c.getContainerID().toString();
ArgumentCaptor<Path> tokenPathCaptor = ArgumentCaptor.forClass(Path.class);
verify(exec).startLocalizer(tokenPathCaptor.capture(), isA(InetSocketAddress.class),
eq("user0"), eq(appStr), eq(ctnrStr), isA(List.class));
verify(exec).startLocalizer(tokenPathCaptor.capture(),
isA(InetSocketAddress.class), eq("user0"), eq(appStr), eq(ctnrStr),
isA(List.class), isA(List.class));
Path localizationTokenPath = tokenPathCaptor.getValue();
// heartbeat from localizer

View File

@ -122,7 +122,8 @@ public void testLocalFileDeletionAfterUpload() throws IOException {
dispatcher.register(ApplicationEventType.class, appEventHandler);
LogAggregationService logAggregationService =
new LogAggregationService(dispatcher, this.context, this.delSrvc);
new LogAggregationService(dispatcher, this.context, this.delSrvc,
super.dirsHandler);
logAggregationService.init(this.conf);
logAggregationService.start();
@ -189,7 +190,8 @@ public void testNoContainerOnNode() {
dispatcher.register(ApplicationEventType.class, appEventHandler);
LogAggregationService logAggregationService =
new LogAggregationService(dispatcher, this.context, this.delSrvc);
new LogAggregationService(dispatcher, this.context, this.delSrvc,
super.dirsHandler);
logAggregationService.init(this.conf);
logAggregationService.start();
@ -237,7 +239,8 @@ public void testMultipleAppsLogAggregation() throws IOException {
dispatcher.register(ApplicationEventType.class, appEventHandler);
LogAggregationService logAggregationService =
new LogAggregationService(dispatcher, this.context, this.delSrvc);
new LogAggregationService(dispatcher, this.context, this.delSrvc,
super.dirsHandler);
logAggregationService.init(this.conf);
logAggregationService.start();

View File

@ -37,6 +37,7 @@
import org.apache.hadoop.yarn.event.EventHandler;
import org.apache.hadoop.yarn.logaggregation.ContainerLogsRetentionPolicy;
import org.apache.hadoop.yarn.server.nodemanager.DeletionService;
import org.apache.hadoop.yarn.server.nodemanager.LocalDirsHandlerService;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.ApplicationEvent;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.ApplicationEventType;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.loghandler.event.LogHandlerAppFinishedEvent;
@ -74,13 +75,16 @@ public void testLogDeletion() {
EventHandler<ApplicationEvent> appEventHandler = mock(EventHandler.class);
dispatcher.register(ApplicationEventType.class, appEventHandler);
LocalDirsHandlerService dirsHandler = new LocalDirsHandlerService();
dirsHandler.init(conf);
ApplicationId appId1 = BuilderUtils.newApplicationId(1234, 1);
ApplicationAttemptId appAttemptId1 =
BuilderUtils.newApplicationAttemptId(appId1, 1);
ContainerId container11 = BuilderUtils.newContainerId(appAttemptId1, 1);
NonAggregatingLogHandler logHandler =
new NonAggregatingLogHandler(dispatcher, delService);
new NonAggregatingLogHandler(dispatcher, delService, dirsHandler);
logHandler.init(conf);
logHandler.start();
@ -146,13 +150,17 @@ public void testDelayedDelete() {
EventHandler<ApplicationEvent> appEventHandler = mock(EventHandler.class);
dispatcher.register(ApplicationEventType.class, appEventHandler);
LocalDirsHandlerService dirsHandler = new LocalDirsHandlerService();
dirsHandler.init(conf);
ApplicationId appId1 = BuilderUtils.newApplicationId(1234, 1);
ApplicationAttemptId appAttemptId1 =
BuilderUtils.newApplicationAttemptId(appId1, 1);
ContainerId container11 = BuilderUtils.newContainerId(appAttemptId1, 1);
NonAggregatingLogHandler logHandler =
new NonAggregatingLogHandlerWithMockExecutor(dispatcher, delService);
new NonAggregatingLogHandlerWithMockExecutor(dispatcher, delService,
dirsHandler);
logHandler.init(conf);
logHandler.start();
@ -182,8 +190,8 @@ private class NonAggregatingLogHandlerWithMockExecutor extends
private ScheduledThreadPoolExecutor mockSched;
public NonAggregatingLogHandlerWithMockExecutor(Dispatcher dispatcher,
DeletionService delService) {
super(dispatcher, delService);
DeletionService delService, LocalDirsHandlerService dirsHandler) {
super(dispatcher, delService, dirsHandler);
}
@Override

View File

@ -27,6 +27,7 @@
import java.io.Writer;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileUtil;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.ContainerId;
@ -37,6 +38,8 @@
import org.apache.hadoop.yarn.factories.RecordFactory;
import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
import org.apache.hadoop.yarn.server.nodemanager.Context;
import org.apache.hadoop.yarn.server.nodemanager.LocalDirsHandlerService;
import org.apache.hadoop.yarn.server.nodemanager.NodeHealthCheckerService;
import org.apache.hadoop.yarn.server.nodemanager.NodeManager;
import org.apache.hadoop.yarn.server.nodemanager.ResourceView;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.Application;
@ -47,6 +50,7 @@
import org.apache.hadoop.yarn.server.security.ApplicationACLsManager;
import org.apache.hadoop.yarn.util.BuilderUtils;
import org.apache.hadoop.yarn.util.ConverterUtils;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
@ -54,10 +58,19 @@ public class TestNMWebServer {
private static final File testRootDir = new File("target",
TestNMWebServer.class.getSimpleName());
private static File testLogDir = new File("target",
TestNMWebServer.class.getSimpleName() + "LogDir");
@Before
public void setup() {
testRootDir.mkdirs();
testLogDir.mkdir();
}
@After
public void tearDown() {
FileUtil.fullyDelete(testRootDir);
FileUtil.fullyDelete(testLogDir);
}
@Test
@ -74,9 +87,14 @@ public long getPmemAllocatedForContainers() {
}
};
Configuration conf = new Configuration();
WebServer server = new WebServer(nmContext, resourceView,
new ApplicationACLsManager(conf));
conf.set(YarnConfiguration.NM_LOCAL_DIRS, testRootDir.getAbsolutePath());
conf.set(YarnConfiguration.NM_LOG_DIRS, testLogDir.getAbsolutePath());
NodeHealthCheckerService healthChecker = new NodeHealthCheckerService();
healthChecker.init(conf);
LocalDirsHandlerService dirsHandler = healthChecker.getDiskHandler();
WebServer server = new WebServer(nmContext, resourceView,
new ApplicationACLsManager(conf), dirsHandler);
server.init(conf);
server.start();
@ -119,20 +137,20 @@ public ContainerState getContainerState() {
containerId.getApplicationAttemptId().getApplicationId();
nmContext.getApplications().get(applicationId).getContainers()
.put(containerId, container);
writeContainerLogs(conf, nmContext, containerId);
writeContainerLogs(nmContext, containerId, dirsHandler);
}
// TODO: Pull logs and test contents.
// Thread.sleep(1000000);
}
private void writeContainerLogs(Configuration conf, Context nmContext,
ContainerId containerId)
private void writeContainerLogs(Context nmContext,
ContainerId containerId, LocalDirsHandlerService dirsHandler)
throws IOException {
// ContainerLogDir should be created
File containerLogDir =
ContainerLogsPage.ContainersLogsBlock.getContainerLogDirs(conf,
containerId).get(0);
ContainerLogsPage.ContainersLogsBlock.getContainerLogDirs(containerId,
dirsHandler).get(0);
containerLogDir.mkdirs();
for (String fileType : new String[] { "stdout", "stderr", "syslog" }) {
Writer writer = new FileWriter(new File(containerLogDir, fileType));

View File

@ -23,7 +23,6 @@
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.NodeHealthCheckerService;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileContext;
import org.apache.hadoop.fs.Path;
@ -41,6 +40,7 @@
import org.apache.hadoop.yarn.server.api.protocolrecords.RegisterNodeManagerRequest;
import org.apache.hadoop.yarn.server.api.protocolrecords.RegisterNodeManagerResponse;
import org.apache.hadoop.yarn.server.nodemanager.Context;
import org.apache.hadoop.yarn.server.nodemanager.NodeHealthCheckerService;
import org.apache.hadoop.yarn.server.nodemanager.NodeManager;
import org.apache.hadoop.yarn.server.nodemanager.NodeStatusUpdater;
import org.apache.hadoop.yarn.server.nodemanager.NodeStatusUpdaterImpl;
@ -51,7 +51,6 @@
import org.apache.hadoop.yarn.server.security.ContainerTokenSecretManager;
import org.apache.hadoop.yarn.service.AbstractService;
import org.apache.hadoop.yarn.service.CompositeService;
import org.apache.hadoop.yarn.service.Service.STATE;
public class MiniYARNCluster extends CompositeService {
@ -69,13 +68,23 @@ public class MiniYARNCluster extends CompositeService {
private File testWorkDir;
public MiniYARNCluster(String testName) {
//default number of nodeManagers = 1
this(testName, 1);
}
// Number of nm-local-dirs per nodemanager
private int numLocalDirs;
// Number of nm-log-dirs per nodemanager
private int numLogDirs;
/**
* @param testName name of the test
* @param noOfNodeManagers the number of node managers in the cluster
* @param numLocalDirs the number of nm-local-dirs per nodemanager
* @param numLogDirs the number of nm-log-dirs per nodemanager
*/
public MiniYARNCluster(String testName, int noOfNodeManagers,
int numLocalDirs, int numLogDirs) {
public MiniYARNCluster(String testName, int noOfNodeManagers) {
super(testName);
this.numLocalDirs = numLocalDirs;
this.numLogDirs = numLogDirs;
this.testWorkDir = new File("target", testName);
try {
FileContext.getLocalFSFileContext().delete(
@ -166,25 +175,39 @@ public synchronized void init(Configuration conf) {
super.init(config);
}
/**
* Create local/log directories
* @param dirType type of directories i.e. local dirs or log dirs
* @param numDirs number of directories
* @return the created directories as a comma delimited String
*/
private String prepareDirs(String dirType, int numDirs) {
File []dirs = new File[numDirs];
String dirsString = "";
for (int i = 0; i < numDirs; i++) {
dirs[i]= new File(testWorkDir, MiniYARNCluster.this.getName()
+ "-" + dirType + "Dir-nm-" + index + "_" + i);
dirs[i].mkdir();
LOG.info("Created " + dirType + "Dir in " + dirs[i].getAbsolutePath());
String delimiter = (i > 0) ? "," : "";
dirsString = dirsString.concat(delimiter + dirs[i].getAbsolutePath());
}
return dirsString;
}
public synchronized void start() {
try {
File localDir = new File(testWorkDir, MiniYARNCluster.this.getName()
+ "-localDir-nm-" + index);
localDir.mkdir();
LOG.info("Created localDir in " + localDir.getAbsolutePath());
getConfig().set(YarnConfiguration.NM_LOCAL_DIRS,
localDir.getAbsolutePath());
File logDir =
new File(testWorkDir, MiniYARNCluster.this.getName()
+ "-logDir-nm-" + index);
// create nm-local-dirs and configure them for the nodemanager
String localDirsString = prepareDirs("local", numLocalDirs);
getConfig().set(YarnConfiguration.NM_LOCAL_DIRS, localDirsString);
// create nm-log-dirs and configure them for the nodemanager
String logDirsString = prepareDirs("log", numLogDirs);
getConfig().set(YarnConfiguration.NM_LOG_DIRS, logDirsString);
File remoteLogDir =
new File(testWorkDir, MiniYARNCluster.this.getName()
+ "-remoteLogDir-nm-" + index);
logDir.mkdir();
remoteLogDir.mkdir();
LOG.info("Created logDir in " + logDir.getAbsolutePath());
getConfig().set(YarnConfiguration.NM_LOG_DIRS,
logDir.getAbsolutePath());
getConfig().set(YarnConfiguration.NM_REMOTE_APP_LOG_DIR,
remoteLogDir.getAbsolutePath());
// By default AM + 2 containers

View File

@ -117,7 +117,7 @@ public static void setup() throws AccessControlException,
conf.setLong(YarnConfiguration.RM_AM_EXPIRY_INTERVAL_MS, 100000L);
UserGroupInformation.setConfiguration(conf);
yarnCluster = new MiniYARNCluster(TestContainerManagerSecurity.class
.getName());
.getName(), 1, 1, 1);
yarnCluster.init(conf);
yarnCluster.start();
}

View File

@ -0,0 +1,247 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.server;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileContext;
import org.apache.hadoop.fs.FileUtil;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.UnsupportedFileSystemException;
import org.apache.hadoop.security.AccessControlException;
import org.apache.hadoop.util.StringUtils;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.server.MiniYARNCluster;
import org.apache.hadoop.yarn.server.nodemanager.LocalDirsHandlerService;
import org.apache.hadoop.yarn.server.nodemanager.NodeManager;
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
import java.io.File;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.util.Iterator;
import java.util.List;
import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.Test;
import junit.framework.Assert;
/**
* Verify if NodeManager's in-memory good local dirs list and good log dirs list
* get updated properly when disks(nm-local-dirs and nm-log-dirs) fail. Also
* verify if the overall health status of the node gets updated properly when
* specified percentage of disks fail.
*/
public class TestDiskFailures {
private static final Log LOG = LogFactory.getLog(TestDiskFailures.class);
private static final long DISK_HEALTH_CHECK_INTERVAL = 1000;//1 sec
private static FileContext localFS = null;
private static final File testDir = new File("target",
TestDiskFailures.class.getName()).getAbsoluteFile();
private static final File localFSDirBase = new File(testDir,
TestDiskFailures.class.getName() + "-localDir");
private static final int numLocalDirs = 4;
private static final int numLogDirs = 4;
private static MiniYARNCluster yarnCluster;
LocalDirsHandlerService dirsHandler;
@BeforeClass
public static void setup() throws AccessControlException,
FileNotFoundException, UnsupportedFileSystemException, IOException {
localFS = FileContext.getLocalFSFileContext();
localFS.delete(new Path(localFSDirBase.getAbsolutePath()), true);
localFSDirBase.mkdirs();
// Do not start cluster here
}
@AfterClass
public static void teardown() {
if (yarnCluster != null) {
yarnCluster.stop();
yarnCluster = null;
}
FileUtil.fullyDelete(localFSDirBase);
}
/**
* Make local-dirs fail/inaccessible and verify if NodeManager can
* recognize the disk failures properly and can update the list of
* local-dirs accordingly with good disks. Also verify the overall
* health status of the node.
* @throws IOException
*/
@Test
public void testLocalDirsFailures() throws IOException {
testDirsFailures(true);
}
/**
* Make log-dirs fail/inaccessible and verify if NodeManager can
* recognize the disk failures properly and can update the list of
* log-dirs accordingly with good disks. Also verify the overall health
* status of the node.
* @throws IOException
*/
@Test
public void testLogDirsFailures() throws IOException {
testDirsFailures(false);
}
private void testDirsFailures(boolean localORLogDirs) throws IOException {
String dirType = localORLogDirs ? "local" : "log";
String dirsProperty = localORLogDirs ? YarnConfiguration.NM_LOCAL_DIRS
: YarnConfiguration.NM_LOG_DIRS;
Configuration conf = new Configuration();
// set disk health check interval to a small value (say 1 sec).
conf.setLong(YarnConfiguration.NM_DISK_HEALTH_CHECK_INTERVAL_MS,
DISK_HEALTH_CHECK_INTERVAL);
// If 2 out of the total 4 local-dirs fail OR if 2 Out of the total 4
// log-dirs fail, then the node's health status should become unhealthy.
conf.setFloat(YarnConfiguration.NM_MIN_HEALTHY_DISKS_FRACTION, 0.60F);
if (yarnCluster != null) {
yarnCluster.stop();
FileUtil.fullyDelete(localFSDirBase);
localFSDirBase.mkdirs();
}
LOG.info("Starting up YARN cluster");
yarnCluster = new MiniYARNCluster(TestDiskFailures.class.getName(),
1, numLocalDirs, numLogDirs);
yarnCluster.init(conf);
yarnCluster.start();
NodeManager nm = yarnCluster.getNodeManager(0);
LOG.info("Configured nm-" + dirType + "-dirs="
+ nm.getConfig().get(dirsProperty));
dirsHandler = nm.getNodeHealthChecker().getDiskHandler();
List<String> list = localORLogDirs ? dirsHandler.getLocalDirs()
: dirsHandler.getLogDirs();
String[] dirs = list.toArray(new String[list.size()]);
Assert.assertEquals("Number of nm-" + dirType + "-dirs is wrong.",
numLocalDirs, dirs.length);
String expectedDirs = StringUtils.join(",", list);
// validate the health of disks initially
verifyDisksHealth(localORLogDirs, expectedDirs, true);
// Make 1 nm-local-dir fail and verify if "the nodemanager can identify
// the disk failure(s) and can update the list of good nm-local-dirs.
prepareDirToFail(dirs[2]);
expectedDirs = dirs[0] + "," + dirs[1] + ","
+ dirs[3];
verifyDisksHealth(localORLogDirs, expectedDirs, true);
// Now, make 1 more nm-local-dir/nm-log-dir fail and verify if "the
// nodemanager can identify the disk failures and can update the list of
// good nm-local-dirs/nm-log-dirs and can update the overall health status
// of the node to unhealthy".
prepareDirToFail(dirs[0]);
expectedDirs = dirs[1] + "," + dirs[3];
verifyDisksHealth(localORLogDirs, expectedDirs, false);
// Fail the remaining 2 local-dirs/log-dirs and verify if NM remains with
// empty list of local-dirs/log-dirs and the overall health status is
// unhealthy.
prepareDirToFail(dirs[1]);
prepareDirToFail(dirs[3]);
expectedDirs = "";
verifyDisksHealth(localORLogDirs, expectedDirs, false);
}
/**
* Wait for the NodeManger to go for the disk-health-check at least once.
*/
private void waitForDiskHealthCheck() {
long lastDisksCheckTime = dirsHandler.getLastDisksCheckTime();
long time = lastDisksCheckTime;
for (int i = 0; i < 10 && (time <= lastDisksCheckTime); i++) {
try {
Thread.sleep(1000);
} catch(InterruptedException e) {
LOG.error(
"Interrupted while waiting for NodeManager's disk health check.");
}
time = dirsHandler.getLastDisksCheckTime();
}
}
/**
* Verify if the NodeManager could identify disk failures.
* @param localORLogDirs <em>true</em> represent nm-local-dirs and <em>false
* </em> means nm-log-dirs
* @param expectedDirs expected nm-local-dirs/nm-log-dirs as a string
* @param isHealthy <em>true</em> if the overall node should be healthy
*/
private void verifyDisksHealth(boolean localORLogDirs, String expectedDirs,
boolean isHealthy) {
// Wait for the NodeManager to identify disk failures.
waitForDiskHealthCheck();
List<String> list = localORLogDirs ? dirsHandler.getLocalDirs()
: dirsHandler.getLogDirs();
String seenDirs = StringUtils.join(",", list);
LOG.info("ExpectedDirs=" + expectedDirs);
LOG.info("SeenDirs=" + seenDirs);
Assert.assertTrue("NodeManager could not identify disk failure.",
expectedDirs.equals(seenDirs));
Assert.assertEquals("Node's health in terms of disks is wrong",
isHealthy, dirsHandler.areDisksHealthy());
for (int i = 0; i < 10; i++) {
Iterator<RMNode> iter = yarnCluster.getResourceManager().getRMContext()
.getRMNodes().values().iterator();
if (iter.next().getNodeHealthStatus().getIsNodeHealthy() == isHealthy) {
break;
}
// wait for the node health info to go to RM
try {
Thread.sleep(1000);
} catch(InterruptedException e) {
LOG.error("Interrupted while waiting for NM->RM heartbeat.");
}
}
Iterator<RMNode> iter = yarnCluster.getResourceManager().getRMContext()
.getRMNodes().values().iterator();
Assert.assertEquals("RM is not updated with the health status of a node",
isHealthy, iter.next().getNodeHealthStatus().getIsNodeHealthy());
}
/**
* Prepare directory for a failure: Replace the given directory on the
* local FileSystem with a regular file with the same name.
* This would cause failure of creation of directory in DiskChecker.checkDir()
* with the same name.
* @param dir the directory to be failed
* @throws IOException
*/
private void prepareDirToFail(String dir) throws IOException {
File file = new File(dir);
FileUtil.fullyDelete(file);
file.createNewFile();
LOG.info("Prepared " + dir + " to fail.");
}
}

View File

@ -398,6 +398,15 @@ Hadoop MapReduce Next Generation - Cluster Setup
| | | Timeout for health script execution. |
*-------------------------+-------------------------+------------------------+
The health checker script is not supposed to give ERROR if only some of the
local disks become bad. NodeManager has the ability to periodically check
the health of the local disks (specifically checks nodemanager-local-dirs
and nodemanager-log-dirs) and after reaching the threshold of number of
bad directories based on the value set for the config property
yarn.nodemanager.disk-health-checker.min-healthy-disks. The boot disk is
either raided or a failure in the boot disk is identified by the health
checker script.
* {Slaves file}
Typically you choose one machine in the cluster to act as the NameNode and
@ -874,13 +883,6 @@ KVNO Timestamp Principal
*-------------------------+-------------------------+------------------------+
|| Parameter || Value || Notes |
*-------------------------+-------------------------+------------------------+
| <<<yarn.nodemanager.local-dirs>>> | |
| | Comma-separated list of NodeManager local directories. | |
| | | Paths to NodeManager local directories. Should be same as the value |
| | | which was provided to key in <<<conf/yarn-site.xml>>>. This is |
| | | required to validate paths passed to the setuid executable in order |
| | to prevent arbitrary paths being passed to it. |
*-------------------------+-------------------------+------------------------+
| <<<yarn.nodemanager.linux-container-executor.group>>> | <hadoop> | |
| | | Unix group of the NodeManager. The group owner of the |
| | |<container-executor> binary should be this group. Should be same as the |
@ -888,14 +890,6 @@ KVNO Timestamp Principal
| | | required for validating the secure access of the <container-executor> |
| | | binary. |
*-------------------------+-------------------------+------------------------+
| <<<yarn.nodemanager.log-dirs>>> | |
| | Comma-separated list of NodeManager log directories. | |
| | | Paths to NodeManager log directories. Should be same as the value |
| | | which was provided to key in <<<conf/yarn-site.xml>>>. This is |
| | | required to set proper permissions on the log files so that they can |
| | | be written to by the user's containers and read by the NodeManager for |
| | | <log aggregation>. |
*-------------------------+-------------------------+------------------------+
| <<<banned.users>>> | hfds,yarn,mapred,bin | Banned users. |
*-------------------------+-------------------------+------------------------+
| <<<min.user.id>>> | 1000 | Prevent other super-users. |