Merge -c 1423706 from trunk to branch-2 to fix YARN-3. Add support for CPU isolation/monitoring of containers. Contributed by Andrew Ferguson.

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/branch-2@1443010 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Arun Murthy 2013-02-06 15:17:57 +00:00
parent d898116ded
commit 0c2206db70
15 changed files with 809 additions and 16 deletions

View File

@ -20,6 +20,9 @@ Release 2.0.3-alpha - 2013-02-06
YARN-145. Add a Web UI to the fair share scheduler. (Sandy Ryza via tomwhite)
YARN-3. Add support for CPU isolation/monitoring of containers.
(adferguson via tucu)
YARN-230. RM Restart phase 1 - includes support for saving/restarting all
applications on an RM bounce. (Bikas Saha via acmurthy)

View File

@ -242,4 +242,11 @@
<Bug pattern="EI_EXPOSE_REP2" />
</Match>
<!-- /proc/mounts is always in the same place -->
<Match>
<Class name="org.apache.hadoop.yarn.server.nodemanager.util.CgroupsLCEResourcesHandler" />
<Method name="parseMtab" />
<Bug pattern="DMI_HARDCODED_ABSOLUTE_FILENAME" />
</Match>
</FindBugsFilter>

View File

@ -514,6 +514,24 @@ public class YarnConfiguration extends Configuration {
public static final String NM_LINUX_CONTAINER_GROUP =
NM_PREFIX + "linux-container-executor.group";
/** The type of resource enforcement to use with the
* linux container executor.
*/
public static final String NM_LINUX_CONTAINER_RESOURCES_HANDLER =
NM_PREFIX + "linux-container-executor.resources-handler.class";
/** The path the linux container executor should use for cgroups */
public static final String NM_LINUX_CONTAINER_CGROUPS_HIERARCHY =
NM_PREFIX + "linux-container-executor.cgroups.hierarchy";
/** Whether the linux container executor should mount cgroups if not found */
public static final String NM_LINUX_CONTAINER_CGROUPS_MOUNT =
NM_PREFIX + "linux-container-executor.cgroups.mount";
/** Where the linux container executor should mount cgroups if not found */
public static final String NM_LINUX_CONTAINER_CGROUPS_MOUNT_PATH =
NM_PREFIX + "linux-container-executor.cgroups.mount-path";
/** T-file compression types used to compress aggregated logs.*/
public static final String NM_LOG_AGG_COMPRESSION_TYPE =
NM_PREFIX + "log-aggregation.compression-type";

View File

@ -534,6 +534,39 @@
<name>yarn.nodemanager.linux-container-executor.path</name>
</property>
<property>
<description>The class which should help the LCE handle resources.</description>
<name>yarn.nodemanager.linux-container-executor.resources-handler.class</name>
<value>org.apache.hadoop.yarn.server.nodemanager.util.DefaultLCEResourcesHandler</value>
<!-- <value>org.apache.hadoop.yarn.server.nodemanager.util.CgroupsLCEResourcesHandler</value> -->
</property>
<property>
<description>The cgroups hierarchy under which to place YARN proccesses (cannot contain commas).
If yarn.nodemanager.linux-container-executor.cgroups.mount is false (that is, if cgroups have
been pre-configured), then this cgroups hierarchy must already exist and be writable by the
NodeManager user, otherwise the NodeManager may fail.
Only used when the LCE resources handler is set to the CgroupsLCEResourcesHandler.</description>
<name>yarn.nodemanager.linux-container-executor.cgroups.hierarchy</name>
<value>/hadoop-yarn</value>
</property>
<property>
<description>Whether the LCE should attempt to mount cgroups if not found.
Only used when the LCE resources handler is set to the CgroupsLCEResourcesHandler.</description>
<name>yarn.nodemanager.linux-container-executor.cgroups.mount</name>
<value>false</value>
</property>
<property>
<description>Where the LCE should attempt to mount cgroups if not found. Common locations
include /sys/fs/cgroup and /cgroup; the default location can vary depending on the Linux
distribution in use. This path must exist before the NodeManager is launched.
Only used when the LCE resources handler is set to the CgroupsLCEResourcesHandler, and
yarn.nodemanager.linux-container-executor.cgroups.mount is true.</description>
<name>yarn.nodemanager.linux-container-executor.cgroups.mount-path</name>
</property>
<property>
<description>T-file compression types used to compress aggregated logs.</description>
<name>yarn.nodemanager.log-aggregation.compression-type</name>

View File

@ -29,6 +29,7 @@ import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.util.ReflectionUtils;
import org.apache.hadoop.util.Shell.ExitCodeException;
import org.apache.hadoop.util.Shell.ShellCommandExecutor;
import org.apache.hadoop.util.StringUtils;
@ -38,6 +39,8 @@ import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerDiagnosticsUpdateEvent;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.ContainerLocalizer;
import org.apache.hadoop.yarn.server.nodemanager.util.DefaultLCEResourcesHandler;
import org.apache.hadoop.yarn.server.nodemanager.util.LCEResourcesHandler;
import org.apache.hadoop.yarn.util.ConverterUtils;
public class LinuxContainerExecutor extends ContainerExecutor {
@ -46,11 +49,18 @@ public class LinuxContainerExecutor extends ContainerExecutor {
.getLog(LinuxContainerExecutor.class);
private String containerExecutorExe;
private LCEResourcesHandler resourcesHandler;
@Override
public void setConf(Configuration conf) {
super.setConf(conf);
containerExecutorExe = getContainerExecutorExecutablePath(conf);
resourcesHandler = ReflectionUtils.newInstance(
conf.getClass(YarnConfiguration.NM_LINUX_CONTAINER_RESOURCES_HANDLER,
DefaultLCEResourcesHandler.class, LCEResourcesHandler.class), conf);
resourcesHandler.setConf(conf);
}
/**
@ -81,7 +91,8 @@ public class LinuxContainerExecutor extends ContainerExecutor {
UNABLE_TO_EXECUTE_CONTAINER_SCRIPT(7),
INVALID_CONTAINER_PID(9),
INVALID_CONTAINER_EXEC_PERMISSIONS(22),
INVALID_CONFIG_FILE(24);
INVALID_CONFIG_FILE(24),
WRITE_CGROUP_FAILED(27);
private final int value;
ResultCode(int value) {
@ -124,6 +135,8 @@ public class LinuxContainerExecutor extends ContainerExecutor {
throw new IOException("Linux container executor not configured properly"
+ " (error=" + exitCode + ")", e);
}
resourcesHandler.init(this);
}
@Override
@ -188,6 +201,11 @@ public class LinuxContainerExecutor extends ContainerExecutor {
ContainerId containerId = container.getContainerID();
String containerIdStr = ConverterUtils.toString(containerId);
resourcesHandler.preExecute(containerId,
container.getLaunchContext().getResource());
String resourcesOptions = resourcesHandler.getResourcesOption(
containerId);
ShellCommandExecutor shExec = null;
@ -202,7 +220,8 @@ public class LinuxContainerExecutor extends ContainerExecutor {
nmPrivateTokensPath.toUri().getPath().toString(),
pidFilePath.toString(),
StringUtils.join(",", localDirs),
StringUtils.join(",", logDirs)));
StringUtils.join(",", logDirs),
resourcesOptions));
String[] commandArray = command.toArray(new String[command.size()]);
shExec = new ShellCommandExecutor(commandArray, null, // NM's cwd
container.getLaunchContext().getEnvironment()); // sanitized env
@ -241,7 +260,7 @@ public class LinuxContainerExecutor extends ContainerExecutor {
}
return exitCode;
} finally {
; //
resourcesHandler.postExecute(containerId);
}
if (LOG.isDebugEnabled()) {
LOG.debug("Output from LinuxContainerExecutor's launchContainer follows:");
@ -316,4 +335,27 @@ public class LinuxContainerExecutor extends ContainerExecutor {
}
}
}
public void mountCgroups(List<String> cgroupKVs, String hierarchy)
throws IOException {
List<String> command = new ArrayList<String>(
Arrays.asList(containerExecutorExe, "--mount-cgroups", hierarchy));
command.addAll(cgroupKVs);
String[] commandArray = command.toArray(new String[command.size()]);
ShellCommandExecutor shExec = new ShellCommandExecutor(commandArray);
if (LOG.isDebugEnabled()) {
LOG.debug("mountCgroups: " + Arrays.toString(commandArray));
}
try {
shExec.execute();
} catch (IOException e) {
int ret_code = shExec.getExitCode();
logOutput(shExec.getOutput());
throw new IOException("Problem mounting cgroups " + cgroupKVs +
"; exit code = " + ret_code, e);
}
}
}

View File

@ -0,0 +1,321 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.server.nodemanager.util;
import java.io.BufferedReader;
import java.io.File;
import java.io.FileReader;
import java.io.FileWriter;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.server.nodemanager.LinuxContainerExecutor;
public class CgroupsLCEResourcesHandler implements LCEResourcesHandler {
final static Log LOG = LogFactory
.getLog(CgroupsLCEResourcesHandler.class);
private Configuration conf;
private String cgroupPrefix;
private boolean cgroupMount;
private String cgroupMountPath;
private boolean cpuWeightEnabled = true;
private final String MTAB_FILE = "/proc/mounts";
private final String CGROUPS_FSTYPE = "cgroup";
private final String CONTROLLER_CPU = "cpu";
private final int CPU_DEFAULT_WEIGHT = 1024; // set by kernel
private final Map<String, String> controllerPaths; // Controller -> path
public CgroupsLCEResourcesHandler() {
this.controllerPaths = new HashMap<String, String>();
}
@Override
public void setConf(Configuration conf) {
this.conf = conf;
}
@Override
public Configuration getConf() {
return conf;
}
public synchronized void init(LinuxContainerExecutor lce) throws IOException {
this.cgroupPrefix = conf.get(YarnConfiguration.
NM_LINUX_CONTAINER_CGROUPS_HIERARCHY, "/hadoop-yarn");
this.cgroupMount = conf.getBoolean(YarnConfiguration.
NM_LINUX_CONTAINER_CGROUPS_MOUNT, false);
this.cgroupMountPath = conf.get(YarnConfiguration.
NM_LINUX_CONTAINER_CGROUPS_MOUNT_PATH, null);
// remove extra /'s at end or start of cgroupPrefix
if (cgroupPrefix.charAt(0) == '/') {
cgroupPrefix = cgroupPrefix.substring(1);
}
int len = cgroupPrefix.length();
if (cgroupPrefix.charAt(len - 1) == '/') {
cgroupPrefix = cgroupPrefix.substring(0, len - 1);
}
// mount cgroups if requested
if (cgroupMount && cgroupMountPath != null) {
ArrayList<String> cgroupKVs = new ArrayList<String>();
cgroupKVs.add(CONTROLLER_CPU + "=" + cgroupMountPath + "/" +
CONTROLLER_CPU);
lce.mountCgroups(cgroupKVs, cgroupPrefix);
}
initializeControllerPaths();
}
boolean isCpuWeightEnabled() {
return this.cpuWeightEnabled;
}
/*
* Next four functions are for an individual cgroup.
*/
private String pathForCgroup(String controller, String groupName) {
String controllerPath = controllerPaths.get(controller);
return controllerPath + "/" + cgroupPrefix + "/" + groupName;
}
private void createCgroup(String controller, String groupName)
throws IOException {
String path = pathForCgroup(controller, groupName);
if (LOG.isDebugEnabled()) {
LOG.debug("createCgroup: " + path);
}
if (! new File(path).mkdir()) {
throw new IOException("Failed to create cgroup at " + path);
}
}
private void updateCgroup(String controller, String groupName, String param,
String value) throws IOException {
FileWriter f = null;
String path = pathForCgroup(controller, groupName);
param = controller + "." + param;
if (LOG.isDebugEnabled()) {
LOG.debug("updateCgroup: " + path + ": " + param + "=" + value);
}
try {
f = new FileWriter(path + "/" + param, false);
f.write(value);
} catch (IOException e) {
throw new IOException("Unable to set " + param + "=" + value +
" for cgroup at: " + path, e);
} finally {
if (f != null) {
try {
f.close();
} catch (IOException e) {
LOG.warn("Unable to close cgroup file: " +
path, e);
}
}
}
}
private void deleteCgroup(String controller, String groupName) {
String path = pathForCgroup(controller, groupName);
LOG.debug("deleteCgroup: " + path);
if (! new File(path).delete()) {
LOG.warn("Unable to delete cgroup at: " + path);
}
}
/*
* Next three functions operate on all the resources we are enforcing.
*/
/*
* TODO: After YARN-2 is committed, we should call containerResource.getCpus()
* (or equivalent) to multiply the weight by the number of requested cpus.
*/
private void setupLimits(ContainerId containerId,
Resource containerResource) throws IOException {
String containerName = containerId.toString();
if (isCpuWeightEnabled()) {
createCgroup(CONTROLLER_CPU, containerName);
updateCgroup(CONTROLLER_CPU, containerName, "shares",
String.valueOf(CPU_DEFAULT_WEIGHT));
}
}
private void clearLimits(ContainerId containerId) {
String containerName = containerId.toString();
// Based on testing, ApplicationMaster executables don't terminate until
// a little after the container appears to have finished. Therefore, we
// wait a short bit for the cgroup to become empty before deleting it.
if (containerId.getId() == 1) {
try {
Thread.sleep(500);
} catch (InterruptedException e) {
// not a problem, continue anyway
}
}
if (isCpuWeightEnabled()) {
deleteCgroup(CONTROLLER_CPU, containerName);
}
}
/*
* LCE Resources Handler interface
*/
public void preExecute(ContainerId containerId, Resource containerResource)
throws IOException {
setupLimits(containerId, containerResource);
}
public void postExecute(ContainerId containerId) {
clearLimits(containerId);
}
public String getResourcesOption(ContainerId containerId) {
String containerName = containerId.toString();
StringBuilder sb = new StringBuilder("cgroups=");
if (isCpuWeightEnabled()) {
sb.append(pathForCgroup(CONTROLLER_CPU, containerName) + "/cgroup.procs");
sb.append(",");
}
if (sb.charAt(sb.length() - 1) == ',') {
sb.deleteCharAt(sb.length() - 1);
}
return sb.toString();
}
/* We are looking for entries of the form:
* none /cgroup/path/mem cgroup rw,memory 0 0
*
* Use a simple pattern that splits on the five spaces, and
* grabs the 2, 3, and 4th fields.
*/
private static final Pattern MTAB_FILE_FORMAT = Pattern.compile(
"^[^\\s]+\\s([^\\s]+)\\s([^\\s]+)\\s([^\\s]+)\\s[^\\s]+\\s[^\\s]+$");
/*
* Returns a map: path -> mount options
* for mounts with type "cgroup". Cgroup controllers will
* appear in the list of options for a path.
*/
private Map<String, List<String>> parseMtab() throws IOException {
Map<String, List<String>> ret = new HashMap<String, List<String>>();
BufferedReader in = null;
try {
in = new BufferedReader(new FileReader(new File(MTAB_FILE)));
for (String str = in.readLine(); str != null;
str = in.readLine()) {
Matcher m = MTAB_FILE_FORMAT.matcher(str);
boolean mat = m.find();
if (mat) {
String path = m.group(1);
String type = m.group(2);
String options = m.group(3);
if (type.equals(CGROUPS_FSTYPE)) {
List<String> value = Arrays.asList(options.split(","));
ret.put(path, value);
}
}
}
} catch (IOException e) {
throw new IOException("Error while reading " + MTAB_FILE, e);
} finally {
// Close the streams
try {
in.close();
} catch (IOException e2) {
LOG.warn("Error closing the stream: " + MTAB_FILE, e2);
}
}
return ret;
}
private String findControllerInMtab(String controller,
Map<String, List<String>> entries) {
for (Entry<String, List<String>> e : entries.entrySet()) {
if (e.getValue().contains(controller))
return e.getKey();
}
return null;
}
private void initializeControllerPaths() throws IOException {
String controllerPath;
Map<String, List<String>> parsedMtab = parseMtab();
// CPU
controllerPath = findControllerInMtab(CONTROLLER_CPU, parsedMtab);
if (controllerPath != null) {
File f = new File(controllerPath + "/" + this.cgroupPrefix);
if (f.canWrite()) {
controllerPaths.put(CONTROLLER_CPU, controllerPath);
} else {
throw new IOException("Not able to enforce cpu weights; cannot write "
+ "to cgroup at: " + controllerPath);
}
} else {
throw new IOException("Not able to enforce cpu weights; cannot find "
+ "cgroup for cpu controller in " + MTAB_FILE);
}
}
}

View File

@ -0,0 +1,65 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.server.nodemanager.util;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.server.nodemanager.LinuxContainerExecutor;
public class DefaultLCEResourcesHandler implements LCEResourcesHandler {
final static Log LOG = LogFactory
.getLog(DefaultLCEResourcesHandler.class);
private Configuration conf;
public DefaultLCEResourcesHandler() {
}
public void setConf(Configuration conf) {
this.conf = conf;
}
@Override
public Configuration getConf() {
return conf;
}
public void init(LinuxContainerExecutor lce) {
}
/*
* LCE Resources Handler interface
*/
public void preExecute(ContainerId containerId, Resource containerResource) {
}
public void postExecute(ContainerId containerId) {
}
public String getResourcesOption(ContainerId containerId) {
return "cgroups=none";
}
}

View File

@ -0,0 +1,49 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.server.nodemanager.util;
import java.io.IOException;
import org.apache.hadoop.conf.Configurable;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.server.nodemanager.LinuxContainerExecutor;
public interface LCEResourcesHandler extends Configurable {
void init(LinuxContainerExecutor lce) throws IOException;
/**
* Called by the LinuxContainerExecutor before launching the executable
* inside the container.
* @param containerId the id of the container being launched
* @param containerResource the node resources the container will be using
*/
void preExecute(ContainerId containerId, Resource containerResource)
throws IOException;
/**
* Called by the LinuxContainerExecutor after the executable inside the
* container has exited (successfully or not).
* @param containerId the id of the container which was launched
*/
void postExecute(ContainerId containerId);
String getResourcesOption(ContainerId containerId);
}

View File

@ -308,7 +308,7 @@ char ** extract_values(char *value) {
tempTok = strtok_r(NULL, ",", &tempstr);
}
}
if (size > 0) {
if (toPass != NULL) {
toPass[size] = NULL;
}
return toPass;
@ -323,3 +323,52 @@ void free_values(char** values) {
free(values);
}
}
/**
* If str is a string of the form key=val, find 'key'
*/
int get_kv_key(const char *input, char *out, size_t out_len) {
if (input == NULL)
return -EINVAL;
char *split = strchr(input, '=');
if (split == NULL)
return -EINVAL;
int key_len = split - input;
if (out_len < (key_len + 1) || out == NULL)
return -ENAMETOOLONG;
memcpy(out, input, key_len);
out[key_len] = '\0';
return 0;
}
/**
* If str is a string of the form key=val, find 'val'
*/
int get_kv_value(const char *input, char *out, size_t out_len) {
if (input == NULL)
return -EINVAL;
char *split = strchr(input, '=');
if (split == NULL)
return -EINVAL;
split++; // advance past '=' to the value
int val_len = (input + strlen(input)) - split;
if (out_len < (val_len + 1) || out == NULL)
return -ENAMETOOLONG;
memcpy(out, split, val_len);
out[val_len] = '\0';
return 0;
}

View File

@ -16,6 +16,8 @@
* limitations under the License.
*/
#include <stddef.h>
/**
* Ensure that the configuration file and all of the containing directories
* are only writable by root. Otherwise, an attacker can change the
@ -50,3 +52,28 @@ void free_values(char** values);
//method to free allocated configuration
void free_configurations();
/**
* If str is a string of the form key=val, find 'key'
*
* @param input The input string
* @param out Where to put the output string.
* @param out_len The length of the output buffer.
*
* @return -ENAMETOOLONG if out_len is not long enough;
* -EINVAL if there is no equals sign in the input;
* 0 on success
*/
int get_kv_key(const char *input, char *out, size_t out_len);
/**
* If str is a string of the form key=val, find 'val'
*
* @param input The input string
* @param out Where to put the output string.
* @param out_len The length of the output buffer.
*
* @return -ENAMETOOLONG if out_len is not long enough;
* -EINVAL if there is no equals sign in the input;
* 0 on success
*/
int get_kv_value(const char *input, char *out, size_t out_len);

View File

@ -31,6 +31,7 @@
#include <stdlib.h>
#include <string.h>
#include <sys/stat.h>
#include <sys/mount.h>
static const int DEFAULT_MIN_USERID = 1000;
@ -150,6 +151,44 @@ static int change_effective_user(uid_t user, gid_t group) {
return 0;
}
/**
* Write the pid of the current process to the cgroup file.
* cgroup_file: Path to cgroup file where pid needs to be written to.
*/
static int write_pid_to_cgroup_as_root(const char* cgroup_file, pid_t pid) {
uid_t user = geteuid();
gid_t group = getegid();
if (change_effective_user(0, 0) != 0) {
return -1;
}
// open
int cgroup_fd = open(cgroup_file, O_WRONLY | O_APPEND, 0);
if (cgroup_fd == -1) {
fprintf(LOGFILE, "Can't open file %s as node manager - %s\n", cgroup_file,
strerror(errno));
return -1;
}
// write pid
char pid_buf[21];
snprintf(pid_buf, sizeof(pid_buf), "%d", pid);
ssize_t written = write(cgroup_fd, pid_buf, strlen(pid_buf));
close(cgroup_fd);
if (written == -1) {
fprintf(LOGFILE, "Failed to write pid to file %s - %s\n",
cgroup_file, strerror(errno));
return -1;
}
// Revert back to the calling user.
if (change_effective_user(user, group)) {
return -1;
}
return 0;
}
/**
* Write the pid of the current process into the pid file.
* pid_file: Path to pid file where pid needs to be written to
@ -810,7 +849,8 @@ int launch_container_as_user(const char *user, const char *app_id,
const char *container_id, const char *work_dir,
const char *script_name, const char *cred_file,
const char* pid_file, char* const* local_dirs,
char* const* log_dirs) {
char* const* log_dirs, const char *resources_key,
char* const* resources_values) {
int exit_code = -1;
char *script_file_dest = NULL;
char *cred_file_dest = NULL;
@ -849,7 +889,22 @@ int launch_container_as_user(const char *user, const char *app_id,
|| write_pid_to_file_as_nm(pid_file, pid) != 0) {
exit_code = WRITE_PIDFILE_FAILED;
goto cleanup;
}
}
// cgroups-based resource enforcement
if (resources_key != NULL && ! strcmp(resources_key, "cgroups")) {
// write pid to cgroups
char* const* cgroup_ptr;
for (cgroup_ptr = resources_values; cgroup_ptr != NULL &&
*cgroup_ptr != NULL; ++cgroup_ptr) {
if (strcmp(*cgroup_ptr, "none") != 0 &&
write_pid_to_cgroup_as_root(*cgroup_ptr, pid) != 0) {
exit_code = WRITE_CGROUP_FAILED;
goto cleanup;
}
}
}
// give up root privs
if (change_user(user_detail->pw_uid, user_detail->pw_gid) != 0) {
@ -1108,4 +1163,73 @@ int delete_as_user(const char *user,
return ret;
}
void chown_dir_contents(const char *dir_path, uid_t uid, gid_t gid) {
DIR *dp;
struct dirent *ep;
char *path_tmp = malloc(strlen(dir_path) + NAME_MAX + 2);
if (path_tmp == NULL) {
return;
}
char *buf = stpncpy(path_tmp, dir_path, strlen(dir_path));
*buf++ = '/';
dp = opendir(dir_path);
if (dp != NULL) {
while (ep = readdir(dp)) {
stpncpy(buf, ep->d_name, strlen(ep->d_name));
buf[strlen(ep->d_name)] = '\0';
change_owner(path_tmp, uid, gid);
}
closedir(dp);
}
free(path_tmp);
}
/**
* Mount a cgroup controller at the requested mount point and create
* a hierarchy for the Hadoop NodeManager to manage.
* pair: a key-value pair of the form "controller=mount-path"
* hierarchy: the top directory of the hierarchy for the NM
*/
int mount_cgroup(const char *pair, const char *hierarchy) {
char *controller = malloc(strlen(pair));
char *mount_path = malloc(strlen(pair));
char hier_path[PATH_MAX];
int result = 0;
if (get_kv_key(pair, controller, strlen(pair)) < 0 ||
get_kv_value(pair, mount_path, strlen(pair)) < 0) {
fprintf(LOGFILE, "Failed to mount cgroup controller; invalid option: %s\n",
pair);
result = -1;
} else {
if (mount("none", mount_path, "cgroup", 0, controller) == 0) {
char *buf = stpncpy(hier_path, mount_path, strlen(mount_path));
*buf++ = '/';
snprintf(buf, PATH_MAX - (buf - hier_path), "%s", hierarchy);
// create hierarchy as 0750 and chown to Hadoop NM user
const mode_t perms = S_IRWXU | S_IRGRP | S_IXGRP;
if (mkdirs(hier_path, perms) == 0) {
change_owner(hier_path, nm_uid, nm_gid);
chown_dir_contents(hier_path, nm_uid, nm_gid);
}
} else {
fprintf(LOGFILE, "Failed to mount cgroup controller %s at %s - %s\n",
controller, mount_path, strerror(errno));
// if controller is already mounted, don't stop trying to mount others
if (errno != EBUSY) {
result = -1;
}
}
}
free(controller);
free(mount_path);
return result;
}

View File

@ -53,7 +53,8 @@ enum errorcodes {
// PREPARE_JOB_LOGS_FAILED (NOT USED) 23
INVALID_CONFIG_FILE = 24,
SETSID_OPER_FAILED = 25,
WRITE_PIDFILE_FAILED = 26
WRITE_PIDFILE_FAILED = 26,
WRITE_CGROUP_FAILED = 27
};
#define NM_GROUP_KEY "yarn.nodemanager.linux-container-executor.group"
@ -111,13 +112,16 @@ int initialize_app(const char *user, const char *app_id,
* @param pid_file file where pid of process should be written to
* @param local_dirs nodemanager-local-directories to be used
* @param log_dirs nodemanager-log-directories to be used
* @param resources_key type of resource enforcement (none, cgroups)
* @param resources_value values needed to apply resource enforcement
* @return -1 or errorcode enum value on error (should never return on success).
*/
int launch_container_as_user(const char * user, const char *app_id,
const char *container_id, const char *work_dir,
const char *script_name, const char *cred_file,
const char *pid_file, char* const* local_dirs,
char* const* log_dirs);
char* const* log_dirs, const char *resources_key,
char* const* resources_value);
/**
* Function used to signal a container launched by the user.
@ -196,3 +200,5 @@ int initialize_user(const char *user, char* const* local_dirs);
int create_directory_for_user(const char* path);
int change_user(uid_t user, gid_t group);
int mount_cgroup(const char *pair, const char *hierarchy);

View File

@ -45,6 +45,9 @@
void display_usage(FILE *stream) {
fprintf(stream,
"Usage: container-executor --checksetup\n");
fprintf(stream,
"Usage: container-executor --mount-cgroups "\
"hierarchy controller=path...\n");
fprintf(stream,
"Usage: container-executor user command command-args\n");
fprintf(stream, "Commands:\n");
@ -52,7 +55,7 @@ void display_usage(FILE *stream) {
"nm-local-dirs nm-log-dirs cmd app...\n", INITIALIZE_CONTAINER);
fprintf(stream,
" launch container: %2d appid containerid workdir "\
"container-script tokens pidfile nm-local-dirs nm-log-dirs\n",
"container-script tokens pidfile nm-local-dirs nm-log-dirs resources\n",
LAUNCH_CONTAINER);
fprintf(stream, " signal container: %2d container-pid signal\n",
SIGNAL_CONTAINER);
@ -63,14 +66,21 @@ void display_usage(FILE *stream) {
int main(int argc, char **argv) {
int invalid_args = 0;
int do_check_setup = 0;
int do_mount_cgroups = 0;
LOGFILE = stdout;
ERRORFILE = stderr;
if (argc > 1) {
if (strcmp("--mount-cgroups", argv[1]) == 0) {
do_mount_cgroups = 1;
}
}
// Minimum number of arguments required to run
// the std. container-executor commands is 4
// 4 args not needed for checksetup option
if (argc < 4) {
if (argc < 4 && !do_mount_cgroups) {
invalid_args = 1;
if (argc == 2) {
const char *arg1 = argv[1];
@ -103,6 +113,7 @@ int main(int argc, char **argv) {
char *orig_conf_file = HADOOP_CONF_DIR "/" CONF_FILENAME;
char *conf_file = resolve_config_path(orig_conf_file, argv[0]);
char *local_dirs, *log_dirs;
char *resources, *resources_key, *resources_value;
if (conf_file == NULL) {
fprintf(ERRORFILE, "Configuration file %s not found.\n", orig_conf_file);
@ -145,6 +156,18 @@ int main(int argc, char **argv) {
return 0;
}
if (do_mount_cgroups) {
optind++;
char *hierarchy = argv[optind++];
int result = 0;
while (optind < argc && result == 0) {
result = mount_cgroup(argv[optind++], hierarchy);
}
return result;
}
//checks done for user name
if (argv[optind] == NULL) {
fprintf(ERRORFILE, "Invalid user name.\n");
@ -180,8 +203,8 @@ int main(int argc, char **argv) {
extract_values(log_dirs), argv + optind);
break;
case LAUNCH_CONTAINER:
if (argc != 11) {
fprintf(ERRORFILE, "Too few arguments (%d vs 11) for launch container\n",
if (argc != 12) {
fprintf(ERRORFILE, "Wrong number of arguments (%d vs 12) for launch container\n",
argc);
fflush(ERRORFILE);
return INVALID_ARGUMENT_NUMBER;
@ -194,10 +217,26 @@ int main(int argc, char **argv) {
pid_file = argv[optind++];
local_dirs = argv[optind++];// good local dirs as a comma separated list
log_dirs = argv[optind++];// good log dirs as a comma separated list
resources = argv[optind++];// key,value pair describing resources
char *resources_key = malloc(strlen(resources));
char *resources_value = malloc(strlen(resources));
if (get_kv_key(resources, resources_key, strlen(resources)) < 0 ||
get_kv_value(resources, resources_value, strlen(resources)) < 0) {
fprintf(ERRORFILE, "Invalid arguments for cgroups resources: %s",
resources);
fflush(ERRORFILE);
free(resources_key);
free(resources_value);
return INVALID_ARGUMENT_NUMBER;
}
char** resources_values = extract_values(resources_value);
exit_code = launch_container_as_user(user_detail->pw_name, app_id,
container_id, current_dir, script_file, cred_file,
pid_file, extract_values(local_dirs),
extract_values(log_dirs));
extract_values(log_dirs), resources_key,
resources_values);
free(resources_key);
free(resources_value);
break;
case SIGNAL_CONTAINER:
if (argc != 5) {

View File

@ -39,6 +39,7 @@
static char* username = NULL;
static char* local_dirs = NULL;
static char* log_dirs = NULL;
static char* resources = NULL;
/**
* Run the command using the effective user id.
@ -610,9 +611,17 @@ void test_run_container() {
strerror(errno));
exit(1);
} else if (child == 0) {
char *key = malloc(strlen(resources));
char *value = malloc(strlen(resources));
if (get_kv_key(resources, key, strlen(resources)) < 0 ||
get_kv_value(resources, key, strlen(resources)) < 0) {
printf("FAIL: resources failed - %s\n");
exit(1);
}
if (launch_container_as_user(username, "app_4", "container_1",
container_dir, script_name, TEST_ROOT "/creds.txt", pid_file,
extract_values(local_dirs), extract_values(log_dirs)) != 0) {
extract_values(local_dirs), extract_values(log_dirs),
key, extract_values(value)) != 0) {
printf("FAIL: failed in child\n");
exit(42);
}

View File

@ -126,7 +126,7 @@ public class TestLinuxContainerExecutorWithMocks {
assertEquals(Arrays.asList(appSubmitter, cmd, appId, containerId,
workDir.toString(), "/bin/echo", "/dev/null", pidFile.toString(),
StringUtils.join(",", dirsHandler.getLocalDirs()),
StringUtils.join(",", dirsHandler.getLogDirs())),
StringUtils.join(",", dirsHandler.getLogDirs()), "cgroups=none"),
readMockParams());
}
@ -211,7 +211,8 @@ public class TestLinuxContainerExecutorWithMocks {
assertEquals(Arrays.asList(appSubmitter, cmd, appId, containerId,
workDir.toString(), "/bin/echo", "/dev/null", pidFile.toString(),
StringUtils.join(",", dirsHandler.getLocalDirs()),
StringUtils.join(",", dirsHandler.getLogDirs())), readMockParams());
StringUtils.join(",", dirsHandler.getLogDirs()),
"cgroups=none"), readMockParams());
}