YARN-3. Add support for CPU isolation/monitoring of containers. (adferguson via tucu)
git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1423706 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
371abd6e21
commit
5032a694ed
|
@ -6,6 +6,9 @@ Trunk - Unreleased
|
||||||
|
|
||||||
NEW FEATURES
|
NEW FEATURES
|
||||||
|
|
||||||
|
YARN-3. Add support for CPU isolation/monitoring of containers.
|
||||||
|
(adferguson via tucu)
|
||||||
|
|
||||||
IMPROVEMENTS
|
IMPROVEMENTS
|
||||||
|
|
||||||
YARN-84. Use Builder to build RPC server. (Brandon Li via suresh)
|
YARN-84. Use Builder to build RPC server. (Brandon Li via suresh)
|
||||||
|
|
|
@ -237,4 +237,11 @@
|
||||||
<Bug pattern="EI_EXPOSE_REP2" />
|
<Bug pattern="EI_EXPOSE_REP2" />
|
||||||
</Match>
|
</Match>
|
||||||
|
|
||||||
|
<!-- /proc/mounts is always in the same place -->
|
||||||
|
<Match>
|
||||||
|
<Class name="org.apache.hadoop.yarn.server.nodemanager.util.CgroupsLCEResourcesHandler" />
|
||||||
|
<Method name="parseMtab" />
|
||||||
|
<Bug pattern="DMI_HARDCODED_ABSOLUTE_FILENAME" />
|
||||||
|
</Match>
|
||||||
|
|
||||||
</FindBugsFilter>
|
</FindBugsFilter>
|
||||||
|
|
|
@ -481,6 +481,24 @@ public class YarnConfiguration extends Configuration {
|
||||||
public static final String NM_LINUX_CONTAINER_GROUP =
|
public static final String NM_LINUX_CONTAINER_GROUP =
|
||||||
NM_PREFIX + "linux-container-executor.group";
|
NM_PREFIX + "linux-container-executor.group";
|
||||||
|
|
||||||
|
/** The type of resource enforcement to use with the
|
||||||
|
* linux container executor.
|
||||||
|
*/
|
||||||
|
public static final String NM_LINUX_CONTAINER_RESOURCES_HANDLER =
|
||||||
|
NM_PREFIX + "linux-container-executor.resources-handler.class";
|
||||||
|
|
||||||
|
/** The path the linux container executor should use for cgroups */
|
||||||
|
public static final String NM_LINUX_CONTAINER_CGROUPS_HIERARCHY =
|
||||||
|
NM_PREFIX + "linux-container-executor.cgroups.hierarchy";
|
||||||
|
|
||||||
|
/** Whether the linux container executor should mount cgroups if not found */
|
||||||
|
public static final String NM_LINUX_CONTAINER_CGROUPS_MOUNT =
|
||||||
|
NM_PREFIX + "linux-container-executor.cgroups.mount";
|
||||||
|
|
||||||
|
/** Where the linux container executor should mount cgroups if not found */
|
||||||
|
public static final String NM_LINUX_CONTAINER_CGROUPS_MOUNT_PATH =
|
||||||
|
NM_PREFIX + "linux-container-executor.cgroups.mount-path";
|
||||||
|
|
||||||
/** T-file compression types used to compress aggregated logs.*/
|
/** T-file compression types used to compress aggregated logs.*/
|
||||||
public static final String NM_LOG_AGG_COMPRESSION_TYPE =
|
public static final String NM_LOG_AGG_COMPRESSION_TYPE =
|
||||||
NM_PREFIX + "log-aggregation.compression-type";
|
NM_PREFIX + "log-aggregation.compression-type";
|
||||||
|
|
|
@ -477,6 +477,39 @@
|
||||||
<name>yarn.nodemanager.linux-container-executor.path</name>
|
<name>yarn.nodemanager.linux-container-executor.path</name>
|
||||||
</property>
|
</property>
|
||||||
|
|
||||||
|
<property>
|
||||||
|
<description>The class which should help the LCE handle resources.</description>
|
||||||
|
<name>yarn.nodemanager.linux-container-executor.resources-handler.class</name>
|
||||||
|
<value>org.apache.hadoop.yarn.server.nodemanager.util.DefaultLCEResourcesHandler</value>
|
||||||
|
<!-- <value>org.apache.hadoop.yarn.server.nodemanager.util.CgroupsLCEResourcesHandler</value> -->
|
||||||
|
</property>
|
||||||
|
|
||||||
|
<property>
|
||||||
|
<description>The cgroups hierarchy under which to place YARN proccesses (cannot contain commas).
|
||||||
|
If yarn.nodemanager.linux-container-executor.cgroups.mount is false (that is, if cgroups have
|
||||||
|
been pre-configured), then this cgroups hierarchy must already exist and be writable by the
|
||||||
|
NodeManager user, otherwise the NodeManager may fail.
|
||||||
|
Only used when the LCE resources handler is set to the CgroupsLCEResourcesHandler.</description>
|
||||||
|
<name>yarn.nodemanager.linux-container-executor.cgroups.hierarchy</name>
|
||||||
|
<value>/hadoop-yarn</value>
|
||||||
|
</property>
|
||||||
|
|
||||||
|
<property>
|
||||||
|
<description>Whether the LCE should attempt to mount cgroups if not found.
|
||||||
|
Only used when the LCE resources handler is set to the CgroupsLCEResourcesHandler.</description>
|
||||||
|
<name>yarn.nodemanager.linux-container-executor.cgroups.mount</name>
|
||||||
|
<value>false</value>
|
||||||
|
</property>
|
||||||
|
|
||||||
|
<property>
|
||||||
|
<description>Where the LCE should attempt to mount cgroups if not found. Common locations
|
||||||
|
include /sys/fs/cgroup and /cgroup; the default location can vary depending on the Linux
|
||||||
|
distribution in use. This path must exist before the NodeManager is launched.
|
||||||
|
Only used when the LCE resources handler is set to the CgroupsLCEResourcesHandler, and
|
||||||
|
yarn.nodemanager.linux-container-executor.cgroups.mount is true.</description>
|
||||||
|
<name>yarn.nodemanager.linux-container-executor.cgroups.mount-path</name>
|
||||||
|
</property>
|
||||||
|
|
||||||
<property>
|
<property>
|
||||||
<description>T-file compression types used to compress aggregated logs.</description>
|
<description>T-file compression types used to compress aggregated logs.</description>
|
||||||
<name>yarn.nodemanager.log-aggregation.compression-type</name>
|
<name>yarn.nodemanager.log-aggregation.compression-type</name>
|
||||||
|
|
|
@ -29,6 +29,7 @@ import org.apache.commons.logging.Log;
|
||||||
import org.apache.commons.logging.LogFactory;
|
import org.apache.commons.logging.LogFactory;
|
||||||
import org.apache.hadoop.conf.Configuration;
|
import org.apache.hadoop.conf.Configuration;
|
||||||
import org.apache.hadoop.fs.Path;
|
import org.apache.hadoop.fs.Path;
|
||||||
|
import org.apache.hadoop.util.ReflectionUtils;
|
||||||
import org.apache.hadoop.util.Shell.ExitCodeException;
|
import org.apache.hadoop.util.Shell.ExitCodeException;
|
||||||
import org.apache.hadoop.util.Shell.ShellCommandExecutor;
|
import org.apache.hadoop.util.Shell.ShellCommandExecutor;
|
||||||
import org.apache.hadoop.util.StringUtils;
|
import org.apache.hadoop.util.StringUtils;
|
||||||
|
@ -38,6 +39,8 @@ import org.apache.hadoop.yarn.conf.YarnConfiguration;
|
||||||
import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container;
|
import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container;
|
||||||
import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerDiagnosticsUpdateEvent;
|
import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerDiagnosticsUpdateEvent;
|
||||||
import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.ContainerLocalizer;
|
import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.ContainerLocalizer;
|
||||||
|
import org.apache.hadoop.yarn.server.nodemanager.util.DefaultLCEResourcesHandler;
|
||||||
|
import org.apache.hadoop.yarn.server.nodemanager.util.LCEResourcesHandler;
|
||||||
import org.apache.hadoop.yarn.util.ConverterUtils;
|
import org.apache.hadoop.yarn.util.ConverterUtils;
|
||||||
|
|
||||||
public class LinuxContainerExecutor extends ContainerExecutor {
|
public class LinuxContainerExecutor extends ContainerExecutor {
|
||||||
|
@ -46,11 +49,18 @@ public class LinuxContainerExecutor extends ContainerExecutor {
|
||||||
.getLog(LinuxContainerExecutor.class);
|
.getLog(LinuxContainerExecutor.class);
|
||||||
|
|
||||||
private String containerExecutorExe;
|
private String containerExecutorExe;
|
||||||
|
private LCEResourcesHandler resourcesHandler;
|
||||||
|
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void setConf(Configuration conf) {
|
public void setConf(Configuration conf) {
|
||||||
super.setConf(conf);
|
super.setConf(conf);
|
||||||
containerExecutorExe = getContainerExecutorExecutablePath(conf);
|
containerExecutorExe = getContainerExecutorExecutablePath(conf);
|
||||||
|
|
||||||
|
resourcesHandler = ReflectionUtils.newInstance(
|
||||||
|
conf.getClass(YarnConfiguration.NM_LINUX_CONTAINER_RESOURCES_HANDLER,
|
||||||
|
DefaultLCEResourcesHandler.class, LCEResourcesHandler.class), conf);
|
||||||
|
resourcesHandler.setConf(conf);
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -81,7 +91,8 @@ public class LinuxContainerExecutor extends ContainerExecutor {
|
||||||
UNABLE_TO_EXECUTE_CONTAINER_SCRIPT(7),
|
UNABLE_TO_EXECUTE_CONTAINER_SCRIPT(7),
|
||||||
INVALID_CONTAINER_PID(9),
|
INVALID_CONTAINER_PID(9),
|
||||||
INVALID_CONTAINER_EXEC_PERMISSIONS(22),
|
INVALID_CONTAINER_EXEC_PERMISSIONS(22),
|
||||||
INVALID_CONFIG_FILE(24);
|
INVALID_CONFIG_FILE(24),
|
||||||
|
WRITE_CGROUP_FAILED(27);
|
||||||
|
|
||||||
private final int value;
|
private final int value;
|
||||||
ResultCode(int value) {
|
ResultCode(int value) {
|
||||||
|
@ -124,6 +135,8 @@ public class LinuxContainerExecutor extends ContainerExecutor {
|
||||||
throw new IOException("Linux container executor not configured properly"
|
throw new IOException("Linux container executor not configured properly"
|
||||||
+ " (error=" + exitCode + ")", e);
|
+ " (error=" + exitCode + ")", e);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
resourcesHandler.init(this);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
@ -189,6 +202,11 @@ public class LinuxContainerExecutor extends ContainerExecutor {
|
||||||
ContainerId containerId = container.getContainerID();
|
ContainerId containerId = container.getContainerID();
|
||||||
String containerIdStr = ConverterUtils.toString(containerId);
|
String containerIdStr = ConverterUtils.toString(containerId);
|
||||||
|
|
||||||
|
resourcesHandler.preExecute(containerId,
|
||||||
|
container.getLaunchContext().getResource());
|
||||||
|
String resourcesOptions = resourcesHandler.getResourcesOption(
|
||||||
|
containerId);
|
||||||
|
|
||||||
ShellCommandExecutor shExec = null;
|
ShellCommandExecutor shExec = null;
|
||||||
|
|
||||||
try {
|
try {
|
||||||
|
@ -202,7 +220,8 @@ public class LinuxContainerExecutor extends ContainerExecutor {
|
||||||
nmPrivateTokensPath.toUri().getPath().toString(),
|
nmPrivateTokensPath.toUri().getPath().toString(),
|
||||||
pidFilePath.toString(),
|
pidFilePath.toString(),
|
||||||
StringUtils.join(",", localDirs),
|
StringUtils.join(",", localDirs),
|
||||||
StringUtils.join(",", logDirs)));
|
StringUtils.join(",", logDirs),
|
||||||
|
resourcesOptions));
|
||||||
String[] commandArray = command.toArray(new String[command.size()]);
|
String[] commandArray = command.toArray(new String[command.size()]);
|
||||||
shExec = new ShellCommandExecutor(commandArray, null, // NM's cwd
|
shExec = new ShellCommandExecutor(commandArray, null, // NM's cwd
|
||||||
container.getLaunchContext().getEnvironment()); // sanitized env
|
container.getLaunchContext().getEnvironment()); // sanitized env
|
||||||
|
@ -241,7 +260,7 @@ public class LinuxContainerExecutor extends ContainerExecutor {
|
||||||
}
|
}
|
||||||
return exitCode;
|
return exitCode;
|
||||||
} finally {
|
} finally {
|
||||||
; //
|
resourcesHandler.postExecute(containerId);
|
||||||
}
|
}
|
||||||
if (LOG.isDebugEnabled()) {
|
if (LOG.isDebugEnabled()) {
|
||||||
LOG.debug("Output from LinuxContainerExecutor's launchContainer follows:");
|
LOG.debug("Output from LinuxContainerExecutor's launchContainer follows:");
|
||||||
|
@ -316,4 +335,27 @@ public class LinuxContainerExecutor extends ContainerExecutor {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public void mountCgroups(List<String> cgroupKVs, String hierarchy)
|
||||||
|
throws IOException {
|
||||||
|
List<String> command = new ArrayList<String>(
|
||||||
|
Arrays.asList(containerExecutorExe, "--mount-cgroups", hierarchy));
|
||||||
|
command.addAll(cgroupKVs);
|
||||||
|
|
||||||
|
String[] commandArray = command.toArray(new String[command.size()]);
|
||||||
|
ShellCommandExecutor shExec = new ShellCommandExecutor(commandArray);
|
||||||
|
|
||||||
|
if (LOG.isDebugEnabled()) {
|
||||||
|
LOG.debug("mountCgroups: " + Arrays.toString(commandArray));
|
||||||
|
}
|
||||||
|
|
||||||
|
try {
|
||||||
|
shExec.execute();
|
||||||
|
} catch (IOException e) {
|
||||||
|
int ret_code = shExec.getExitCode();
|
||||||
|
logOutput(shExec.getOutput());
|
||||||
|
throw new IOException("Problem mounting cgroups " + cgroupKVs +
|
||||||
|
"; exit code = " + ret_code, e);
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -0,0 +1,321 @@
|
||||||
|
/**
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one
|
||||||
|
* or more contributor license agreements. See the NOTICE file
|
||||||
|
* distributed with this work for additional information
|
||||||
|
* regarding copyright ownership. The ASF licenses this file
|
||||||
|
* to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance
|
||||||
|
* with the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
|
||||||
|
package org.apache.hadoop.yarn.server.nodemanager.util;
|
||||||
|
|
||||||
|
import java.io.BufferedReader;
|
||||||
|
import java.io.File;
|
||||||
|
import java.io.FileReader;
|
||||||
|
import java.io.FileWriter;
|
||||||
|
import java.io.IOException;
|
||||||
|
import java.util.ArrayList;
|
||||||
|
import java.util.Arrays;
|
||||||
|
import java.util.HashMap;
|
||||||
|
import java.util.List;
|
||||||
|
import java.util.Map;
|
||||||
|
import java.util.Map.Entry;
|
||||||
|
import java.util.regex.Matcher;
|
||||||
|
import java.util.regex.Pattern;
|
||||||
|
|
||||||
|
import org.apache.commons.logging.Log;
|
||||||
|
import org.apache.commons.logging.LogFactory;
|
||||||
|
import org.apache.hadoop.conf.Configuration;
|
||||||
|
import org.apache.hadoop.yarn.api.records.ContainerId;
|
||||||
|
import org.apache.hadoop.yarn.api.records.Resource;
|
||||||
|
import org.apache.hadoop.yarn.conf.YarnConfiguration;
|
||||||
|
import org.apache.hadoop.yarn.server.nodemanager.LinuxContainerExecutor;
|
||||||
|
|
||||||
|
public class CgroupsLCEResourcesHandler implements LCEResourcesHandler {
|
||||||
|
|
||||||
|
final static Log LOG = LogFactory
|
||||||
|
.getLog(CgroupsLCEResourcesHandler.class);
|
||||||
|
|
||||||
|
private Configuration conf;
|
||||||
|
private String cgroupPrefix;
|
||||||
|
private boolean cgroupMount;
|
||||||
|
private String cgroupMountPath;
|
||||||
|
|
||||||
|
private boolean cpuWeightEnabled = true;
|
||||||
|
|
||||||
|
private final String MTAB_FILE = "/proc/mounts";
|
||||||
|
private final String CGROUPS_FSTYPE = "cgroup";
|
||||||
|
private final String CONTROLLER_CPU = "cpu";
|
||||||
|
private final int CPU_DEFAULT_WEIGHT = 1024; // set by kernel
|
||||||
|
private final Map<String, String> controllerPaths; // Controller -> path
|
||||||
|
|
||||||
|
public CgroupsLCEResourcesHandler() {
|
||||||
|
this.controllerPaths = new HashMap<String, String>();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void setConf(Configuration conf) {
|
||||||
|
this.conf = conf;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public Configuration getConf() {
|
||||||
|
return conf;
|
||||||
|
}
|
||||||
|
|
||||||
|
public synchronized void init(LinuxContainerExecutor lce) throws IOException {
|
||||||
|
|
||||||
|
this.cgroupPrefix = conf.get(YarnConfiguration.
|
||||||
|
NM_LINUX_CONTAINER_CGROUPS_HIERARCHY, "/hadoop-yarn");
|
||||||
|
this.cgroupMount = conf.getBoolean(YarnConfiguration.
|
||||||
|
NM_LINUX_CONTAINER_CGROUPS_MOUNT, false);
|
||||||
|
this.cgroupMountPath = conf.get(YarnConfiguration.
|
||||||
|
NM_LINUX_CONTAINER_CGROUPS_MOUNT_PATH, null);
|
||||||
|
|
||||||
|
// remove extra /'s at end or start of cgroupPrefix
|
||||||
|
if (cgroupPrefix.charAt(0) == '/') {
|
||||||
|
cgroupPrefix = cgroupPrefix.substring(1);
|
||||||
|
}
|
||||||
|
|
||||||
|
int len = cgroupPrefix.length();
|
||||||
|
if (cgroupPrefix.charAt(len - 1) == '/') {
|
||||||
|
cgroupPrefix = cgroupPrefix.substring(0, len - 1);
|
||||||
|
}
|
||||||
|
|
||||||
|
// mount cgroups if requested
|
||||||
|
if (cgroupMount && cgroupMountPath != null) {
|
||||||
|
ArrayList<String> cgroupKVs = new ArrayList<String>();
|
||||||
|
cgroupKVs.add(CONTROLLER_CPU + "=" + cgroupMountPath + "/" +
|
||||||
|
CONTROLLER_CPU);
|
||||||
|
lce.mountCgroups(cgroupKVs, cgroupPrefix);
|
||||||
|
}
|
||||||
|
|
||||||
|
initializeControllerPaths();
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
boolean isCpuWeightEnabled() {
|
||||||
|
return this.cpuWeightEnabled;
|
||||||
|
}
|
||||||
|
|
||||||
|
/*
|
||||||
|
* Next four functions are for an individual cgroup.
|
||||||
|
*/
|
||||||
|
|
||||||
|
private String pathForCgroup(String controller, String groupName) {
|
||||||
|
String controllerPath = controllerPaths.get(controller);
|
||||||
|
return controllerPath + "/" + cgroupPrefix + "/" + groupName;
|
||||||
|
}
|
||||||
|
|
||||||
|
private void createCgroup(String controller, String groupName)
|
||||||
|
throws IOException {
|
||||||
|
String path = pathForCgroup(controller, groupName);
|
||||||
|
|
||||||
|
if (LOG.isDebugEnabled()) {
|
||||||
|
LOG.debug("createCgroup: " + path);
|
||||||
|
}
|
||||||
|
|
||||||
|
if (! new File(path).mkdir()) {
|
||||||
|
throw new IOException("Failed to create cgroup at " + path);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private void updateCgroup(String controller, String groupName, String param,
|
||||||
|
String value) throws IOException {
|
||||||
|
FileWriter f = null;
|
||||||
|
String path = pathForCgroup(controller, groupName);
|
||||||
|
param = controller + "." + param;
|
||||||
|
|
||||||
|
if (LOG.isDebugEnabled()) {
|
||||||
|
LOG.debug("updateCgroup: " + path + ": " + param + "=" + value);
|
||||||
|
}
|
||||||
|
|
||||||
|
try {
|
||||||
|
f = new FileWriter(path + "/" + param, false);
|
||||||
|
f.write(value);
|
||||||
|
} catch (IOException e) {
|
||||||
|
throw new IOException("Unable to set " + param + "=" + value +
|
||||||
|
" for cgroup at: " + path, e);
|
||||||
|
} finally {
|
||||||
|
if (f != null) {
|
||||||
|
try {
|
||||||
|
f.close();
|
||||||
|
} catch (IOException e) {
|
||||||
|
LOG.warn("Unable to close cgroup file: " +
|
||||||
|
path, e);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private void deleteCgroup(String controller, String groupName) {
|
||||||
|
String path = pathForCgroup(controller, groupName);
|
||||||
|
|
||||||
|
LOG.debug("deleteCgroup: " + path);
|
||||||
|
|
||||||
|
if (! new File(path).delete()) {
|
||||||
|
LOG.warn("Unable to delete cgroup at: " + path);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/*
|
||||||
|
* Next three functions operate on all the resources we are enforcing.
|
||||||
|
*/
|
||||||
|
|
||||||
|
/*
|
||||||
|
* TODO: After YARN-2 is committed, we should call containerResource.getCpus()
|
||||||
|
* (or equivalent) to multiply the weight by the number of requested cpus.
|
||||||
|
*/
|
||||||
|
private void setupLimits(ContainerId containerId,
|
||||||
|
Resource containerResource) throws IOException {
|
||||||
|
String containerName = containerId.toString();
|
||||||
|
|
||||||
|
if (isCpuWeightEnabled()) {
|
||||||
|
createCgroup(CONTROLLER_CPU, containerName);
|
||||||
|
updateCgroup(CONTROLLER_CPU, containerName, "shares",
|
||||||
|
String.valueOf(CPU_DEFAULT_WEIGHT));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private void clearLimits(ContainerId containerId) {
|
||||||
|
String containerName = containerId.toString();
|
||||||
|
|
||||||
|
// Based on testing, ApplicationMaster executables don't terminate until
|
||||||
|
// a little after the container appears to have finished. Therefore, we
|
||||||
|
// wait a short bit for the cgroup to become empty before deleting it.
|
||||||
|
if (containerId.getId() == 1) {
|
||||||
|
try {
|
||||||
|
Thread.sleep(500);
|
||||||
|
} catch (InterruptedException e) {
|
||||||
|
// not a problem, continue anyway
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
if (isCpuWeightEnabled()) {
|
||||||
|
deleteCgroup(CONTROLLER_CPU, containerName);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/*
|
||||||
|
* LCE Resources Handler interface
|
||||||
|
*/
|
||||||
|
|
||||||
|
public void preExecute(ContainerId containerId, Resource containerResource)
|
||||||
|
throws IOException {
|
||||||
|
setupLimits(containerId, containerResource);
|
||||||
|
}
|
||||||
|
|
||||||
|
public void postExecute(ContainerId containerId) {
|
||||||
|
clearLimits(containerId);
|
||||||
|
}
|
||||||
|
|
||||||
|
public String getResourcesOption(ContainerId containerId) {
|
||||||
|
String containerName = containerId.toString();
|
||||||
|
|
||||||
|
StringBuilder sb = new StringBuilder("cgroups=");
|
||||||
|
|
||||||
|
if (isCpuWeightEnabled()) {
|
||||||
|
sb.append(pathForCgroup(CONTROLLER_CPU, containerName) + "/cgroup.procs");
|
||||||
|
sb.append(",");
|
||||||
|
}
|
||||||
|
|
||||||
|
if (sb.charAt(sb.length() - 1) == ',') {
|
||||||
|
sb.deleteCharAt(sb.length() - 1);
|
||||||
|
}
|
||||||
|
|
||||||
|
return sb.toString();
|
||||||
|
}
|
||||||
|
|
||||||
|
/* We are looking for entries of the form:
|
||||||
|
* none /cgroup/path/mem cgroup rw,memory 0 0
|
||||||
|
*
|
||||||
|
* Use a simple pattern that splits on the five spaces, and
|
||||||
|
* grabs the 2, 3, and 4th fields.
|
||||||
|
*/
|
||||||
|
|
||||||
|
private static final Pattern MTAB_FILE_FORMAT = Pattern.compile(
|
||||||
|
"^[^\\s]+\\s([^\\s]+)\\s([^\\s]+)\\s([^\\s]+)\\s[^\\s]+\\s[^\\s]+$");
|
||||||
|
|
||||||
|
/*
|
||||||
|
* Returns a map: path -> mount options
|
||||||
|
* for mounts with type "cgroup". Cgroup controllers will
|
||||||
|
* appear in the list of options for a path.
|
||||||
|
*/
|
||||||
|
private Map<String, List<String>> parseMtab() throws IOException {
|
||||||
|
Map<String, List<String>> ret = new HashMap<String, List<String>>();
|
||||||
|
BufferedReader in = null;
|
||||||
|
|
||||||
|
try {
|
||||||
|
in = new BufferedReader(new FileReader(new File(MTAB_FILE)));
|
||||||
|
|
||||||
|
for (String str = in.readLine(); str != null;
|
||||||
|
str = in.readLine()) {
|
||||||
|
Matcher m = MTAB_FILE_FORMAT.matcher(str);
|
||||||
|
boolean mat = m.find();
|
||||||
|
if (mat) {
|
||||||
|
String path = m.group(1);
|
||||||
|
String type = m.group(2);
|
||||||
|
String options = m.group(3);
|
||||||
|
|
||||||
|
if (type.equals(CGROUPS_FSTYPE)) {
|
||||||
|
List<String> value = Arrays.asList(options.split(","));
|
||||||
|
ret.put(path, value);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
} catch (IOException e) {
|
||||||
|
throw new IOException("Error while reading " + MTAB_FILE, e);
|
||||||
|
} finally {
|
||||||
|
// Close the streams
|
||||||
|
try {
|
||||||
|
in.close();
|
||||||
|
} catch (IOException e2) {
|
||||||
|
LOG.warn("Error closing the stream: " + MTAB_FILE, e2);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return ret;
|
||||||
|
}
|
||||||
|
|
||||||
|
private String findControllerInMtab(String controller,
|
||||||
|
Map<String, List<String>> entries) {
|
||||||
|
for (Entry<String, List<String>> e : entries.entrySet()) {
|
||||||
|
if (e.getValue().contains(controller))
|
||||||
|
return e.getKey();
|
||||||
|
}
|
||||||
|
|
||||||
|
return null;
|
||||||
|
}
|
||||||
|
|
||||||
|
private void initializeControllerPaths() throws IOException {
|
||||||
|
String controllerPath;
|
||||||
|
Map<String, List<String>> parsedMtab = parseMtab();
|
||||||
|
|
||||||
|
// CPU
|
||||||
|
|
||||||
|
controllerPath = findControllerInMtab(CONTROLLER_CPU, parsedMtab);
|
||||||
|
|
||||||
|
if (controllerPath != null) {
|
||||||
|
File f = new File(controllerPath + "/" + this.cgroupPrefix);
|
||||||
|
|
||||||
|
if (f.canWrite()) {
|
||||||
|
controllerPaths.put(CONTROLLER_CPU, controllerPath);
|
||||||
|
} else {
|
||||||
|
throw new IOException("Not able to enforce cpu weights; cannot write "
|
||||||
|
+ "to cgroup at: " + controllerPath);
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
throw new IOException("Not able to enforce cpu weights; cannot find "
|
||||||
|
+ "cgroup for cpu controller in " + MTAB_FILE);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
|
@ -0,0 +1,65 @@
|
||||||
|
/**
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one
|
||||||
|
* or more contributor license agreements. See the NOTICE file
|
||||||
|
* distributed with this work for additional information
|
||||||
|
* regarding copyright ownership. The ASF licenses this file
|
||||||
|
* to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance
|
||||||
|
* with the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
|
||||||
|
package org.apache.hadoop.yarn.server.nodemanager.util;
|
||||||
|
|
||||||
|
import org.apache.commons.logging.Log;
|
||||||
|
import org.apache.commons.logging.LogFactory;
|
||||||
|
import org.apache.hadoop.conf.Configuration;
|
||||||
|
import org.apache.hadoop.yarn.api.records.ContainerId;
|
||||||
|
import org.apache.hadoop.yarn.api.records.Resource;
|
||||||
|
import org.apache.hadoop.yarn.server.nodemanager.LinuxContainerExecutor;
|
||||||
|
|
||||||
|
public class DefaultLCEResourcesHandler implements LCEResourcesHandler {
|
||||||
|
|
||||||
|
final static Log LOG = LogFactory
|
||||||
|
.getLog(DefaultLCEResourcesHandler.class);
|
||||||
|
|
||||||
|
private Configuration conf;
|
||||||
|
|
||||||
|
public DefaultLCEResourcesHandler() {
|
||||||
|
}
|
||||||
|
|
||||||
|
public void setConf(Configuration conf) {
|
||||||
|
this.conf = conf;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public Configuration getConf() {
|
||||||
|
return conf;
|
||||||
|
}
|
||||||
|
|
||||||
|
public void init(LinuxContainerExecutor lce) {
|
||||||
|
}
|
||||||
|
|
||||||
|
/*
|
||||||
|
* LCE Resources Handler interface
|
||||||
|
*/
|
||||||
|
|
||||||
|
public void preExecute(ContainerId containerId, Resource containerResource) {
|
||||||
|
}
|
||||||
|
|
||||||
|
public void postExecute(ContainerId containerId) {
|
||||||
|
}
|
||||||
|
|
||||||
|
public String getResourcesOption(ContainerId containerId) {
|
||||||
|
return "cgroups=none";
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
}
|
|
@ -0,0 +1,49 @@
|
||||||
|
/**
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one
|
||||||
|
* or more contributor license agreements. See the NOTICE file
|
||||||
|
* distributed with this work for additional information
|
||||||
|
* regarding copyright ownership. The ASF licenses this file
|
||||||
|
* to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance
|
||||||
|
* with the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
|
||||||
|
package org.apache.hadoop.yarn.server.nodemanager.util;
|
||||||
|
|
||||||
|
import java.io.IOException;
|
||||||
|
|
||||||
|
import org.apache.hadoop.conf.Configurable;
|
||||||
|
import org.apache.hadoop.yarn.api.records.ContainerId;
|
||||||
|
import org.apache.hadoop.yarn.api.records.Resource;
|
||||||
|
import org.apache.hadoop.yarn.server.nodemanager.LinuxContainerExecutor;
|
||||||
|
|
||||||
|
public interface LCEResourcesHandler extends Configurable {
|
||||||
|
|
||||||
|
void init(LinuxContainerExecutor lce) throws IOException;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Called by the LinuxContainerExecutor before launching the executable
|
||||||
|
* inside the container.
|
||||||
|
* @param containerId the id of the container being launched
|
||||||
|
* @param containerResource the node resources the container will be using
|
||||||
|
*/
|
||||||
|
void preExecute(ContainerId containerId, Resource containerResource)
|
||||||
|
throws IOException;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Called by the LinuxContainerExecutor after the executable inside the
|
||||||
|
* container has exited (successfully or not).
|
||||||
|
* @param containerId the id of the container which was launched
|
||||||
|
*/
|
||||||
|
void postExecute(ContainerId containerId);
|
||||||
|
|
||||||
|
String getResourcesOption(ContainerId containerId);
|
||||||
|
}
|
|
@ -308,7 +308,7 @@ char ** extract_values(char *value) {
|
||||||
tempTok = strtok_r(NULL, ",", &tempstr);
|
tempTok = strtok_r(NULL, ",", &tempstr);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
if (size > 0) {
|
if (toPass != NULL) {
|
||||||
toPass[size] = NULL;
|
toPass[size] = NULL;
|
||||||
}
|
}
|
||||||
return toPass;
|
return toPass;
|
||||||
|
@ -323,3 +323,52 @@ void free_values(char** values) {
|
||||||
free(values);
|
free(values);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* If str is a string of the form key=val, find 'key'
|
||||||
|
*/
|
||||||
|
int get_kv_key(const char *input, char *out, size_t out_len) {
|
||||||
|
|
||||||
|
if (input == NULL)
|
||||||
|
return -EINVAL;
|
||||||
|
|
||||||
|
char *split = strchr(input, '=');
|
||||||
|
|
||||||
|
if (split == NULL)
|
||||||
|
return -EINVAL;
|
||||||
|
|
||||||
|
int key_len = split - input;
|
||||||
|
|
||||||
|
if (out_len < (key_len + 1) || out == NULL)
|
||||||
|
return -ENAMETOOLONG;
|
||||||
|
|
||||||
|
memcpy(out, input, key_len);
|
||||||
|
out[key_len] = '\0';
|
||||||
|
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* If str is a string of the form key=val, find 'val'
|
||||||
|
*/
|
||||||
|
int get_kv_value(const char *input, char *out, size_t out_len) {
|
||||||
|
|
||||||
|
if (input == NULL)
|
||||||
|
return -EINVAL;
|
||||||
|
|
||||||
|
char *split = strchr(input, '=');
|
||||||
|
|
||||||
|
if (split == NULL)
|
||||||
|
return -EINVAL;
|
||||||
|
|
||||||
|
split++; // advance past '=' to the value
|
||||||
|
int val_len = (input + strlen(input)) - split;
|
||||||
|
|
||||||
|
if (out_len < (val_len + 1) || out == NULL)
|
||||||
|
return -ENAMETOOLONG;
|
||||||
|
|
||||||
|
memcpy(out, split, val_len);
|
||||||
|
out[val_len] = '\0';
|
||||||
|
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
|
|
@ -16,6 +16,8 @@
|
||||||
* limitations under the License.
|
* limitations under the License.
|
||||||
*/
|
*/
|
||||||
|
|
||||||
|
#include <stddef.h>
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Ensure that the configuration file and all of the containing directories
|
* Ensure that the configuration file and all of the containing directories
|
||||||
* are only writable by root. Otherwise, an attacker can change the
|
* are only writable by root. Otherwise, an attacker can change the
|
||||||
|
@ -50,3 +52,28 @@ void free_values(char** values);
|
||||||
//method to free allocated configuration
|
//method to free allocated configuration
|
||||||
void free_configurations();
|
void free_configurations();
|
||||||
|
|
||||||
|
/**
|
||||||
|
* If str is a string of the form key=val, find 'key'
|
||||||
|
*
|
||||||
|
* @param input The input string
|
||||||
|
* @param out Where to put the output string.
|
||||||
|
* @param out_len The length of the output buffer.
|
||||||
|
*
|
||||||
|
* @return -ENAMETOOLONG if out_len is not long enough;
|
||||||
|
* -EINVAL if there is no equals sign in the input;
|
||||||
|
* 0 on success
|
||||||
|
*/
|
||||||
|
int get_kv_key(const char *input, char *out, size_t out_len);
|
||||||
|
|
||||||
|
/**
|
||||||
|
* If str is a string of the form key=val, find 'val'
|
||||||
|
*
|
||||||
|
* @param input The input string
|
||||||
|
* @param out Where to put the output string.
|
||||||
|
* @param out_len The length of the output buffer.
|
||||||
|
*
|
||||||
|
* @return -ENAMETOOLONG if out_len is not long enough;
|
||||||
|
* -EINVAL if there is no equals sign in the input;
|
||||||
|
* 0 on success
|
||||||
|
*/
|
||||||
|
int get_kv_value(const char *input, char *out, size_t out_len);
|
||||||
|
|
|
@ -31,6 +31,7 @@
|
||||||
#include <stdlib.h>
|
#include <stdlib.h>
|
||||||
#include <string.h>
|
#include <string.h>
|
||||||
#include <sys/stat.h>
|
#include <sys/stat.h>
|
||||||
|
#include <sys/mount.h>
|
||||||
|
|
||||||
static const int DEFAULT_MIN_USERID = 1000;
|
static const int DEFAULT_MIN_USERID = 1000;
|
||||||
|
|
||||||
|
@ -150,6 +151,44 @@ static int change_effective_user(uid_t user, gid_t group) {
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Write the pid of the current process to the cgroup file.
|
||||||
|
* cgroup_file: Path to cgroup file where pid needs to be written to.
|
||||||
|
*/
|
||||||
|
static int write_pid_to_cgroup_as_root(const char* cgroup_file, pid_t pid) {
|
||||||
|
uid_t user = geteuid();
|
||||||
|
gid_t group = getegid();
|
||||||
|
if (change_effective_user(0, 0) != 0) {
|
||||||
|
return -1;
|
||||||
|
}
|
||||||
|
|
||||||
|
// open
|
||||||
|
int cgroup_fd = open(cgroup_file, O_WRONLY | O_APPEND, 0);
|
||||||
|
if (cgroup_fd == -1) {
|
||||||
|
fprintf(LOGFILE, "Can't open file %s as node manager - %s\n", cgroup_file,
|
||||||
|
strerror(errno));
|
||||||
|
return -1;
|
||||||
|
}
|
||||||
|
|
||||||
|
// write pid
|
||||||
|
char pid_buf[21];
|
||||||
|
snprintf(pid_buf, sizeof(pid_buf), "%d", pid);
|
||||||
|
ssize_t written = write(cgroup_fd, pid_buf, strlen(pid_buf));
|
||||||
|
close(cgroup_fd);
|
||||||
|
if (written == -1) {
|
||||||
|
fprintf(LOGFILE, "Failed to write pid to file %s - %s\n",
|
||||||
|
cgroup_file, strerror(errno));
|
||||||
|
return -1;
|
||||||
|
}
|
||||||
|
|
||||||
|
// Revert back to the calling user.
|
||||||
|
if (change_effective_user(user, group)) {
|
||||||
|
return -1;
|
||||||
|
}
|
||||||
|
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Write the pid of the current process into the pid file.
|
* Write the pid of the current process into the pid file.
|
||||||
* pid_file: Path to pid file where pid needs to be written to
|
* pid_file: Path to pid file where pid needs to be written to
|
||||||
|
@ -810,7 +849,8 @@ int launch_container_as_user(const char *user, const char *app_id,
|
||||||
const char *container_id, const char *work_dir,
|
const char *container_id, const char *work_dir,
|
||||||
const char *script_name, const char *cred_file,
|
const char *script_name, const char *cred_file,
|
||||||
const char* pid_file, char* const* local_dirs,
|
const char* pid_file, char* const* local_dirs,
|
||||||
char* const* log_dirs) {
|
char* const* log_dirs, const char *resources_key,
|
||||||
|
char* const* resources_values) {
|
||||||
int exit_code = -1;
|
int exit_code = -1;
|
||||||
char *script_file_dest = NULL;
|
char *script_file_dest = NULL;
|
||||||
char *cred_file_dest = NULL;
|
char *cred_file_dest = NULL;
|
||||||
|
@ -851,6 +891,21 @@ int launch_container_as_user(const char *user, const char *app_id,
|
||||||
goto cleanup;
|
goto cleanup;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// cgroups-based resource enforcement
|
||||||
|
if (resources_key != NULL && ! strcmp(resources_key, "cgroups")) {
|
||||||
|
|
||||||
|
// write pid to cgroups
|
||||||
|
char* const* cgroup_ptr;
|
||||||
|
for (cgroup_ptr = resources_values; cgroup_ptr != NULL &&
|
||||||
|
*cgroup_ptr != NULL; ++cgroup_ptr) {
|
||||||
|
if (strcmp(*cgroup_ptr, "none") != 0 &&
|
||||||
|
write_pid_to_cgroup_as_root(*cgroup_ptr, pid) != 0) {
|
||||||
|
exit_code = WRITE_CGROUP_FAILED;
|
||||||
|
goto cleanup;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
// give up root privs
|
// give up root privs
|
||||||
if (change_user(user_detail->pw_uid, user_detail->pw_gid) != 0) {
|
if (change_user(user_detail->pw_uid, user_detail->pw_gid) != 0) {
|
||||||
exit_code = SETUID_OPER_FAILED;
|
exit_code = SETUID_OPER_FAILED;
|
||||||
|
@ -1108,4 +1163,73 @@ int delete_as_user(const char *user,
|
||||||
return ret;
|
return ret;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
void chown_dir_contents(const char *dir_path, uid_t uid, gid_t gid) {
|
||||||
|
DIR *dp;
|
||||||
|
struct dirent *ep;
|
||||||
|
|
||||||
|
char *path_tmp = malloc(strlen(dir_path) + NAME_MAX + 2);
|
||||||
|
if (path_tmp == NULL) {
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
char *buf = stpncpy(path_tmp, dir_path, strlen(dir_path));
|
||||||
|
*buf++ = '/';
|
||||||
|
|
||||||
|
dp = opendir(dir_path);
|
||||||
|
if (dp != NULL) {
|
||||||
|
while (ep = readdir(dp)) {
|
||||||
|
stpncpy(buf, ep->d_name, strlen(ep->d_name));
|
||||||
|
buf[strlen(ep->d_name)] = '\0';
|
||||||
|
change_owner(path_tmp, uid, gid);
|
||||||
|
}
|
||||||
|
closedir(dp);
|
||||||
|
}
|
||||||
|
|
||||||
|
free(path_tmp);
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Mount a cgroup controller at the requested mount point and create
|
||||||
|
* a hierarchy for the Hadoop NodeManager to manage.
|
||||||
|
* pair: a key-value pair of the form "controller=mount-path"
|
||||||
|
* hierarchy: the top directory of the hierarchy for the NM
|
||||||
|
*/
|
||||||
|
int mount_cgroup(const char *pair, const char *hierarchy) {
|
||||||
|
char *controller = malloc(strlen(pair));
|
||||||
|
char *mount_path = malloc(strlen(pair));
|
||||||
|
char hier_path[PATH_MAX];
|
||||||
|
int result = 0;
|
||||||
|
|
||||||
|
if (get_kv_key(pair, controller, strlen(pair)) < 0 ||
|
||||||
|
get_kv_value(pair, mount_path, strlen(pair)) < 0) {
|
||||||
|
fprintf(LOGFILE, "Failed to mount cgroup controller; invalid option: %s\n",
|
||||||
|
pair);
|
||||||
|
result = -1;
|
||||||
|
} else {
|
||||||
|
if (mount("none", mount_path, "cgroup", 0, controller) == 0) {
|
||||||
|
char *buf = stpncpy(hier_path, mount_path, strlen(mount_path));
|
||||||
|
*buf++ = '/';
|
||||||
|
snprintf(buf, PATH_MAX - (buf - hier_path), "%s", hierarchy);
|
||||||
|
|
||||||
|
// create hierarchy as 0750 and chown to Hadoop NM user
|
||||||
|
const mode_t perms = S_IRWXU | S_IRGRP | S_IXGRP;
|
||||||
|
if (mkdirs(hier_path, perms) == 0) {
|
||||||
|
change_owner(hier_path, nm_uid, nm_gid);
|
||||||
|
chown_dir_contents(hier_path, nm_uid, nm_gid);
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
fprintf(LOGFILE, "Failed to mount cgroup controller %s at %s - %s\n",
|
||||||
|
controller, mount_path, strerror(errno));
|
||||||
|
// if controller is already mounted, don't stop trying to mount others
|
||||||
|
if (errno != EBUSY) {
|
||||||
|
result = -1;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
free(controller);
|
||||||
|
free(mount_path);
|
||||||
|
|
||||||
|
return result;
|
||||||
|
}
|
||||||
|
|
||||||
|
|
|
@ -53,7 +53,8 @@ enum errorcodes {
|
||||||
// PREPARE_JOB_LOGS_FAILED (NOT USED) 23
|
// PREPARE_JOB_LOGS_FAILED (NOT USED) 23
|
||||||
INVALID_CONFIG_FILE = 24,
|
INVALID_CONFIG_FILE = 24,
|
||||||
SETSID_OPER_FAILED = 25,
|
SETSID_OPER_FAILED = 25,
|
||||||
WRITE_PIDFILE_FAILED = 26
|
WRITE_PIDFILE_FAILED = 26,
|
||||||
|
WRITE_CGROUP_FAILED = 27
|
||||||
};
|
};
|
||||||
|
|
||||||
#define NM_GROUP_KEY "yarn.nodemanager.linux-container-executor.group"
|
#define NM_GROUP_KEY "yarn.nodemanager.linux-container-executor.group"
|
||||||
|
@ -111,13 +112,16 @@ int initialize_app(const char *user, const char *app_id,
|
||||||
* @param pid_file file where pid of process should be written to
|
* @param pid_file file where pid of process should be written to
|
||||||
* @param local_dirs nodemanager-local-directories to be used
|
* @param local_dirs nodemanager-local-directories to be used
|
||||||
* @param log_dirs nodemanager-log-directories to be used
|
* @param log_dirs nodemanager-log-directories to be used
|
||||||
|
* @param resources_key type of resource enforcement (none, cgroups)
|
||||||
|
* @param resources_value values needed to apply resource enforcement
|
||||||
* @return -1 or errorcode enum value on error (should never return on success).
|
* @return -1 or errorcode enum value on error (should never return on success).
|
||||||
*/
|
*/
|
||||||
int launch_container_as_user(const char * user, const char *app_id,
|
int launch_container_as_user(const char * user, const char *app_id,
|
||||||
const char *container_id, const char *work_dir,
|
const char *container_id, const char *work_dir,
|
||||||
const char *script_name, const char *cred_file,
|
const char *script_name, const char *cred_file,
|
||||||
const char *pid_file, char* const* local_dirs,
|
const char *pid_file, char* const* local_dirs,
|
||||||
char* const* log_dirs);
|
char* const* log_dirs, const char *resources_key,
|
||||||
|
char* const* resources_value);
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Function used to signal a container launched by the user.
|
* Function used to signal a container launched by the user.
|
||||||
|
@ -196,3 +200,5 @@ int initialize_user(const char *user, char* const* local_dirs);
|
||||||
int create_directory_for_user(const char* path);
|
int create_directory_for_user(const char* path);
|
||||||
|
|
||||||
int change_user(uid_t user, gid_t group);
|
int change_user(uid_t user, gid_t group);
|
||||||
|
|
||||||
|
int mount_cgroup(const char *pair, const char *hierarchy);
|
||||||
|
|
|
@ -45,6 +45,9 @@
|
||||||
void display_usage(FILE *stream) {
|
void display_usage(FILE *stream) {
|
||||||
fprintf(stream,
|
fprintf(stream,
|
||||||
"Usage: container-executor --checksetup\n");
|
"Usage: container-executor --checksetup\n");
|
||||||
|
fprintf(stream,
|
||||||
|
"Usage: container-executor --mount-cgroups "\
|
||||||
|
"hierarchy controller=path...\n");
|
||||||
fprintf(stream,
|
fprintf(stream,
|
||||||
"Usage: container-executor user command command-args\n");
|
"Usage: container-executor user command command-args\n");
|
||||||
fprintf(stream, "Commands:\n");
|
fprintf(stream, "Commands:\n");
|
||||||
|
@ -52,7 +55,7 @@ void display_usage(FILE *stream) {
|
||||||
"nm-local-dirs nm-log-dirs cmd app...\n", INITIALIZE_CONTAINER);
|
"nm-local-dirs nm-log-dirs cmd app...\n", INITIALIZE_CONTAINER);
|
||||||
fprintf(stream,
|
fprintf(stream,
|
||||||
" launch container: %2d appid containerid workdir "\
|
" launch container: %2d appid containerid workdir "\
|
||||||
"container-script tokens pidfile nm-local-dirs nm-log-dirs\n",
|
"container-script tokens pidfile nm-local-dirs nm-log-dirs resources\n",
|
||||||
LAUNCH_CONTAINER);
|
LAUNCH_CONTAINER);
|
||||||
fprintf(stream, " signal container: %2d container-pid signal\n",
|
fprintf(stream, " signal container: %2d container-pid signal\n",
|
||||||
SIGNAL_CONTAINER);
|
SIGNAL_CONTAINER);
|
||||||
|
@ -63,14 +66,21 @@ void display_usage(FILE *stream) {
|
||||||
int main(int argc, char **argv) {
|
int main(int argc, char **argv) {
|
||||||
int invalid_args = 0;
|
int invalid_args = 0;
|
||||||
int do_check_setup = 0;
|
int do_check_setup = 0;
|
||||||
|
int do_mount_cgroups = 0;
|
||||||
|
|
||||||
LOGFILE = stdout;
|
LOGFILE = stdout;
|
||||||
ERRORFILE = stderr;
|
ERRORFILE = stderr;
|
||||||
|
|
||||||
|
if (argc > 1) {
|
||||||
|
if (strcmp("--mount-cgroups", argv[1]) == 0) {
|
||||||
|
do_mount_cgroups = 1;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
// Minimum number of arguments required to run
|
// Minimum number of arguments required to run
|
||||||
// the std. container-executor commands is 4
|
// the std. container-executor commands is 4
|
||||||
// 4 args not needed for checksetup option
|
// 4 args not needed for checksetup option
|
||||||
if (argc < 4) {
|
if (argc < 4 && !do_mount_cgroups) {
|
||||||
invalid_args = 1;
|
invalid_args = 1;
|
||||||
if (argc == 2) {
|
if (argc == 2) {
|
||||||
const char *arg1 = argv[1];
|
const char *arg1 = argv[1];
|
||||||
|
@ -103,6 +113,7 @@ int main(int argc, char **argv) {
|
||||||
char *orig_conf_file = HADOOP_CONF_DIR "/" CONF_FILENAME;
|
char *orig_conf_file = HADOOP_CONF_DIR "/" CONF_FILENAME;
|
||||||
char *conf_file = resolve_config_path(orig_conf_file, argv[0]);
|
char *conf_file = resolve_config_path(orig_conf_file, argv[0]);
|
||||||
char *local_dirs, *log_dirs;
|
char *local_dirs, *log_dirs;
|
||||||
|
char *resources, *resources_key, *resources_value;
|
||||||
|
|
||||||
if (conf_file == NULL) {
|
if (conf_file == NULL) {
|
||||||
fprintf(ERRORFILE, "Configuration file %s not found.\n", orig_conf_file);
|
fprintf(ERRORFILE, "Configuration file %s not found.\n", orig_conf_file);
|
||||||
|
@ -145,6 +156,18 @@ int main(int argc, char **argv) {
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if (do_mount_cgroups) {
|
||||||
|
optind++;
|
||||||
|
char *hierarchy = argv[optind++];
|
||||||
|
int result = 0;
|
||||||
|
|
||||||
|
while (optind < argc && result == 0) {
|
||||||
|
result = mount_cgroup(argv[optind++], hierarchy);
|
||||||
|
}
|
||||||
|
|
||||||
|
return result;
|
||||||
|
}
|
||||||
|
|
||||||
//checks done for user name
|
//checks done for user name
|
||||||
if (argv[optind] == NULL) {
|
if (argv[optind] == NULL) {
|
||||||
fprintf(ERRORFILE, "Invalid user name.\n");
|
fprintf(ERRORFILE, "Invalid user name.\n");
|
||||||
|
@ -180,8 +203,8 @@ int main(int argc, char **argv) {
|
||||||
extract_values(log_dirs), argv + optind);
|
extract_values(log_dirs), argv + optind);
|
||||||
break;
|
break;
|
||||||
case LAUNCH_CONTAINER:
|
case LAUNCH_CONTAINER:
|
||||||
if (argc != 11) {
|
if (argc != 12) {
|
||||||
fprintf(ERRORFILE, "Too few arguments (%d vs 11) for launch container\n",
|
fprintf(ERRORFILE, "Wrong number of arguments (%d vs 12) for launch container\n",
|
||||||
argc);
|
argc);
|
||||||
fflush(ERRORFILE);
|
fflush(ERRORFILE);
|
||||||
return INVALID_ARGUMENT_NUMBER;
|
return INVALID_ARGUMENT_NUMBER;
|
||||||
|
@ -194,10 +217,26 @@ int main(int argc, char **argv) {
|
||||||
pid_file = argv[optind++];
|
pid_file = argv[optind++];
|
||||||
local_dirs = argv[optind++];// good local dirs as a comma separated list
|
local_dirs = argv[optind++];// good local dirs as a comma separated list
|
||||||
log_dirs = argv[optind++];// good log dirs as a comma separated list
|
log_dirs = argv[optind++];// good log dirs as a comma separated list
|
||||||
|
resources = argv[optind++];// key,value pair describing resources
|
||||||
|
char *resources_key = malloc(strlen(resources));
|
||||||
|
char *resources_value = malloc(strlen(resources));
|
||||||
|
if (get_kv_key(resources, resources_key, strlen(resources)) < 0 ||
|
||||||
|
get_kv_value(resources, resources_value, strlen(resources)) < 0) {
|
||||||
|
fprintf(ERRORFILE, "Invalid arguments for cgroups resources: %s",
|
||||||
|
resources);
|
||||||
|
fflush(ERRORFILE);
|
||||||
|
free(resources_key);
|
||||||
|
free(resources_value);
|
||||||
|
return INVALID_ARGUMENT_NUMBER;
|
||||||
|
}
|
||||||
|
char** resources_values = extract_values(resources_value);
|
||||||
exit_code = launch_container_as_user(user_detail->pw_name, app_id,
|
exit_code = launch_container_as_user(user_detail->pw_name, app_id,
|
||||||
container_id, current_dir, script_file, cred_file,
|
container_id, current_dir, script_file, cred_file,
|
||||||
pid_file, extract_values(local_dirs),
|
pid_file, extract_values(local_dirs),
|
||||||
extract_values(log_dirs));
|
extract_values(log_dirs), resources_key,
|
||||||
|
resources_values);
|
||||||
|
free(resources_key);
|
||||||
|
free(resources_value);
|
||||||
break;
|
break;
|
||||||
case SIGNAL_CONTAINER:
|
case SIGNAL_CONTAINER:
|
||||||
if (argc != 5) {
|
if (argc != 5) {
|
||||||
|
|
|
@ -39,6 +39,7 @@
|
||||||
static char* username = NULL;
|
static char* username = NULL;
|
||||||
static char* local_dirs = NULL;
|
static char* local_dirs = NULL;
|
||||||
static char* log_dirs = NULL;
|
static char* log_dirs = NULL;
|
||||||
|
static char* resources = NULL;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Run the command using the effective user id.
|
* Run the command using the effective user id.
|
||||||
|
@ -610,9 +611,17 @@ void test_run_container() {
|
||||||
strerror(errno));
|
strerror(errno));
|
||||||
exit(1);
|
exit(1);
|
||||||
} else if (child == 0) {
|
} else if (child == 0) {
|
||||||
|
char *key = malloc(strlen(resources));
|
||||||
|
char *value = malloc(strlen(resources));
|
||||||
|
if (get_kv_key(resources, key, strlen(resources)) < 0 ||
|
||||||
|
get_kv_value(resources, key, strlen(resources)) < 0) {
|
||||||
|
printf("FAIL: resources failed - %s\n");
|
||||||
|
exit(1);
|
||||||
|
}
|
||||||
if (launch_container_as_user(username, "app_4", "container_1",
|
if (launch_container_as_user(username, "app_4", "container_1",
|
||||||
container_dir, script_name, TEST_ROOT "/creds.txt", pid_file,
|
container_dir, script_name, TEST_ROOT "/creds.txt", pid_file,
|
||||||
extract_values(local_dirs), extract_values(log_dirs)) != 0) {
|
extract_values(local_dirs), extract_values(log_dirs),
|
||||||
|
key, extract_values(value)) != 0) {
|
||||||
printf("FAIL: failed in child\n");
|
printf("FAIL: failed in child\n");
|
||||||
exit(42);
|
exit(42);
|
||||||
}
|
}
|
||||||
|
|
|
@ -126,7 +126,7 @@ public class TestLinuxContainerExecutorWithMocks {
|
||||||
assertEquals(Arrays.asList(appSubmitter, cmd, appId, containerId,
|
assertEquals(Arrays.asList(appSubmitter, cmd, appId, containerId,
|
||||||
workDir.toString(), "/bin/echo", "/dev/null", pidFile.toString(),
|
workDir.toString(), "/bin/echo", "/dev/null", pidFile.toString(),
|
||||||
StringUtils.join(",", dirsHandler.getLocalDirs()),
|
StringUtils.join(",", dirsHandler.getLocalDirs()),
|
||||||
StringUtils.join(",", dirsHandler.getLogDirs())),
|
StringUtils.join(",", dirsHandler.getLogDirs()), "cgroups=none"),
|
||||||
readMockParams());
|
readMockParams());
|
||||||
|
|
||||||
}
|
}
|
||||||
|
@ -211,7 +211,8 @@ public class TestLinuxContainerExecutorWithMocks {
|
||||||
assertEquals(Arrays.asList(appSubmitter, cmd, appId, containerId,
|
assertEquals(Arrays.asList(appSubmitter, cmd, appId, containerId,
|
||||||
workDir.toString(), "/bin/echo", "/dev/null", pidFile.toString(),
|
workDir.toString(), "/bin/echo", "/dev/null", pidFile.toString(),
|
||||||
StringUtils.join(",", dirsHandler.getLocalDirs()),
|
StringUtils.join(",", dirsHandler.getLocalDirs()),
|
||||||
StringUtils.join(",", dirsHandler.getLogDirs())), readMockParams());
|
StringUtils.join(",", dirsHandler.getLogDirs()),
|
||||||
|
"cgroups=none"), readMockParams());
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue