YARN-2531. Added a configuration for admins to be able to override app-configs and enforce/not-enforce strict control of per-container cpu usage. Contributed by Varun Vasudev.

(cherry picked from commit 9f6891d9ef)
This commit is contained in:
Vinod Kumar Vavilapalli 2014-09-16 10:14:46 -07:00
parent 08abb320a7
commit 9d34dc87e1
5 changed files with 189 additions and 29 deletions

View File

@ -57,6 +57,10 @@ Release 2.6.0 - UNRELEASED
failures should be ignored towards counting max-attempts. (Xuan Gong via failures should be ignored towards counting max-attempts. (Xuan Gong via
vinodkv) vinodkv)
YARN-2531. Added a configuration for admins to be able to override app-configs
and enforce/not-enforce strict control of per-container cpu usage. (Varun
Vasudev via vinodkv)
IMPROVEMENTS IMPROVEMENTS
YARN-2242. Improve exception information on AM launch crashes. (Li Lu YARN-2242. Improve exception information on AM launch crashes. (Li Lu

View File

@ -902,6 +902,16 @@ public class YarnConfiguration extends Configuration {
public static final String NM_LINUX_CONTAINER_CGROUPS_MOUNT_PATH = public static final String NM_LINUX_CONTAINER_CGROUPS_MOUNT_PATH =
NM_PREFIX + "linux-container-executor.cgroups.mount-path"; NM_PREFIX + "linux-container-executor.cgroups.mount-path";
/**
* Whether the apps should run in strict resource usage mode(not allowed to
* use spare CPU)
*/
public static final String NM_LINUX_CONTAINER_CGROUPS_STRICT_RESOURCE_USAGE =
NM_PREFIX + "linux-container-executor.cgroups.strict-resource-usage";
public static final boolean DEFAULT_NM_LINUX_CONTAINER_CGROUPS_STRICT_RESOURCE_USAGE =
false;
/** /**
* Interval of time the linux container executor should try cleaning up * Interval of time the linux container executor should try cleaning up

View File

@ -1038,6 +1038,16 @@
<value>^[_.A-Za-z0-9][-@_.A-Za-z0-9]{0,255}?[$]?$</value> <value>^[_.A-Za-z0-9][-@_.A-Za-z0-9]{0,255}?[$]?$</value>
</property> </property>
<property>
<description>This flag determines whether apps should run with strict resource limits
or be allowed to consume spare resources if they need them. For example, turning the
flag on will restrict apps to use only their share of CPU, even if the node has spare
CPU cycles. The default value is false i.e. use available resources. Please note that
turning this flag on may reduce job throughput on the cluster.</description>
<name>yarn.nodemanager.linux-container-executor.cgroups.strict-resource-usage</name>
<value>false</value>
</property>
<property> <property>
<description>T-file compression types used to compress aggregated logs.</description> <description>T-file compression types used to compress aggregated logs.</description>
<name>yarn.nodemanager.log-aggregation.compression-type</name> <name>yarn.nodemanager.log-aggregation.compression-type</name>

View File

@ -57,6 +57,7 @@ public class CgroupsLCEResourcesHandler implements LCEResourcesHandler {
private String cgroupMountPath; private String cgroupMountPath;
private boolean cpuWeightEnabled = true; private boolean cpuWeightEnabled = true;
private boolean strictResourceUsageMode = false;
private final String MTAB_FILE = "/proc/mounts"; private final String MTAB_FILE = "/proc/mounts";
private final String CGROUPS_FSTYPE = "cgroup"; private final String CGROUPS_FSTYPE = "cgroup";
@ -72,6 +73,8 @@ public class CgroupsLCEResourcesHandler implements LCEResourcesHandler {
// package private for testing purposes // package private for testing purposes
Clock clock; Clock clock;
private float yarnProcessors;
public CgroupsLCEResourcesHandler() { public CgroupsLCEResourcesHandler() {
this.controllerPaths = new HashMap<String, String>(); this.controllerPaths = new HashMap<String, String>();
clock = new SystemClock(); clock = new SystemClock();
@ -105,6 +108,12 @@ public class CgroupsLCEResourcesHandler implements LCEResourcesHandler {
cgroupPrefix = cgroupPrefix.substring(1); cgroupPrefix = cgroupPrefix.substring(1);
} }
this.strictResourceUsageMode =
conf
.getBoolean(
YarnConfiguration.NM_LINUX_CONTAINER_CGROUPS_STRICT_RESOURCE_USAGE,
YarnConfiguration.DEFAULT_NM_LINUX_CONTAINER_CGROUPS_STRICT_RESOURCE_USAGE);
int len = cgroupPrefix.length(); int len = cgroupPrefix.length();
if (cgroupPrefix.charAt(len - 1) == '/') { if (cgroupPrefix.charAt(len - 1) == '/') {
cgroupPrefix = cgroupPrefix.substring(0, len - 1); cgroupPrefix = cgroupPrefix.substring(0, len - 1);
@ -132,8 +141,7 @@ public class CgroupsLCEResourcesHandler implements LCEResourcesHandler {
initializeControllerPaths(); initializeControllerPaths();
// cap overall usage to the number of cores allocated to YARN // cap overall usage to the number of cores allocated to YARN
float yarnProcessors = yarnProcessors = NodeManagerHardwareUtils.getContainersCores(plugin, conf);
NodeManagerHardwareUtils.getContainersCores(plugin, conf);
int systemProcessors = plugin.getNumProcessors(); int systemProcessors = plugin.getNumProcessors();
if (systemProcessors != (int) yarnProcessors) { if (systemProcessors != (int) yarnProcessors) {
LOG.info("YARN containers restricted to " + yarnProcessors + " cores"); LOG.info("YARN containers restricted to " + yarnProcessors + " cores");
@ -290,10 +298,25 @@ public class CgroupsLCEResourcesHandler implements LCEResourcesHandler {
String containerName = containerId.toString(); String containerName = containerId.toString();
if (isCpuWeightEnabled()) { if (isCpuWeightEnabled()) {
int containerVCores = containerResource.getVirtualCores();
createCgroup(CONTROLLER_CPU, containerName); createCgroup(CONTROLLER_CPU, containerName);
int cpuShares = CPU_DEFAULT_WEIGHT * containerResource.getVirtualCores(); int cpuShares = CPU_DEFAULT_WEIGHT * containerVCores;
updateCgroup(CONTROLLER_CPU, containerName, "shares", updateCgroup(CONTROLLER_CPU, containerName, "shares",
String.valueOf(cpuShares)); String.valueOf(cpuShares));
if (strictResourceUsageMode) {
int nodeVCores =
conf.getInt(YarnConfiguration.NM_VCORES,
YarnConfiguration.DEFAULT_NM_VCORES);
if (nodeVCores != containerVCores) {
float containerCPU =
(containerVCores * yarnProcessors) / (float) nodeVCores;
int[] limits = getOverallLimits(containerCPU);
updateCgroup(CONTROLLER_CPU, containerName, CPU_PERIOD_US,
String.valueOf(limits[0]));
updateCgroup(CONTROLLER_CPU, containerName, CPU_QUOTA_US,
String.valueOf(limits[1]));
}
}
} }
} }

View File

@ -18,6 +18,8 @@
package org.apache.hadoop.yarn.server.nodemanager.util; package org.apache.hadoop.yarn.server.nodemanager.util;
import org.apache.commons.io.FileUtils; import org.apache.commons.io.FileUtils;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.server.nodemanager.LinuxContainerExecutor; import org.apache.hadoop.yarn.server.nodemanager.LinuxContainerExecutor;
import org.apache.hadoop.yarn.util.ResourceCalculatorPlugin; import org.apache.hadoop.yarn.util.ResourceCalculatorPlugin;
import org.junit.Assert; import org.junit.Assert;
@ -86,9 +88,13 @@ public class TestCgroupsLCEResourcesHandler {
String mtabFile; String mtabFile;
int[] limits = new int[2]; int[] limits = new int[2];
boolean generateLimitsMode = false;
@Override @Override
int[] getOverallLimits(float x) { int[] getOverallLimits(float x) {
if (generateLimitsMode == true) {
return super.getOverallLimits(x);
}
return limits; return limits;
} }
@ -116,32 +122,11 @@ public class TestCgroupsLCEResourcesHandler {
handler.initConfig(); handler.initConfig();
// create mock cgroup // create mock cgroup
File cgroupDir = new File("target", UUID.randomUUID().toString()); File cgroupDir = createMockCgroup();
if (!cgroupDir.mkdir()) { File cgroupMountDir = createMockCgroupMount(cgroupDir);
String message = "Could not create dir " + cgroupDir.getAbsolutePath();
throw new IOException(message);
}
File cgroupMountDir = new File(cgroupDir.getAbsolutePath(), "hadoop-yarn");
if (!cgroupMountDir.mkdir()) {
String message =
"Could not create dir " + cgroupMountDir.getAbsolutePath();
throw new IOException(message);
}
// create mock mtab // create mock mtab
String mtabContent = File mockMtab = createMockMTab(cgroupDir);
"none " + cgroupDir.getAbsolutePath() + " cgroup rw,relatime,cpu 0 0";
File mockMtab = new File("target", UUID.randomUUID().toString());
if (!mockMtab.exists()) {
if (!mockMtab.createNewFile()) {
String message = "Could not create file " + mockMtab.getAbsolutePath();
throw new IOException(message);
}
}
FileWriter mtabWriter = new FileWriter(mockMtab.getAbsoluteFile());
mtabWriter.write(mtabContent);
mtabWriter.close();
mockMtab.deleteOnExit();
// setup our handler and call init() // setup our handler and call init()
handler.setMtabFile(mockMtab.getAbsolutePath()); handler.setMtabFile(mockMtab.getAbsolutePath());
@ -156,7 +141,8 @@ public class TestCgroupsLCEResourcesHandler {
Assert.assertFalse(quotaFile.exists()); Assert.assertFalse(quotaFile.exists());
// subset of cpu being used, files should be created // subset of cpu being used, files should be created
conf.setInt(YarnConfiguration.NM_RESOURCE_PERCENTAGE_PHYSICAL_CPU_LIMIT, 75); conf
.setInt(YarnConfiguration.NM_RESOURCE_PERCENTAGE_PHYSICAL_CPU_LIMIT, 75);
handler.limits[0] = 100 * 1000; handler.limits[0] = 100 * 1000;
handler.limits[1] = 1000 * 1000; handler.limits[1] = 1000 * 1000;
handler.init(mockLCE, plugin); handler.init(mockLCE, plugin);
@ -166,7 +152,8 @@ public class TestCgroupsLCEResourcesHandler {
Assert.assertEquals(1000 * 1000, quota); Assert.assertEquals(1000 * 1000, quota);
// set cpu back to 100, quota should be -1 // set cpu back to 100, quota should be -1
conf.setInt(YarnConfiguration.NM_RESOURCE_PERCENTAGE_PHYSICAL_CPU_LIMIT, 100); conf.setInt(YarnConfiguration.NM_RESOURCE_PERCENTAGE_PHYSICAL_CPU_LIMIT,
100);
handler.limits[0] = 100 * 1000; handler.limits[0] = 100 * 1000;
handler.limits[1] = 1000 * 1000; handler.limits[1] = 1000 * 1000;
handler.init(mockLCE, plugin); handler.init(mockLCE, plugin);
@ -213,4 +200,130 @@ public class TestCgroupsLCEResourcesHandler {
Assert.assertEquals(1000 * 1000, ret[0]); Assert.assertEquals(1000 * 1000, ret[0]);
Assert.assertEquals(-1, ret[1]); Assert.assertEquals(-1, ret[1]);
} }
private File createMockCgroup() throws IOException {
File cgroupDir = new File("target", UUID.randomUUID().toString());
if (!cgroupDir.mkdir()) {
String message = "Could not create dir " + cgroupDir.getAbsolutePath();
throw new IOException(message);
}
return cgroupDir;
}
private File createMockCgroupMount(File cgroupDir) throws IOException {
File cgroupMountDir = new File(cgroupDir.getAbsolutePath(), "hadoop-yarn");
if (!cgroupMountDir.mkdir()) {
String message =
"Could not create dir " + cgroupMountDir.getAbsolutePath();
throw new IOException(message);
}
return cgroupMountDir;
}
private File createMockMTab(File cgroupDir) throws IOException {
String mtabContent =
"none " + cgroupDir.getAbsolutePath() + " cgroup rw,relatime,cpu 0 0";
File mockMtab = new File("target", UUID.randomUUID().toString());
if (!mockMtab.exists()) {
if (!mockMtab.createNewFile()) {
String message = "Could not create file " + mockMtab.getAbsolutePath();
throw new IOException(message);
}
}
FileWriter mtabWriter = new FileWriter(mockMtab.getAbsoluteFile());
mtabWriter.write(mtabContent);
mtabWriter.close();
mockMtab.deleteOnExit();
return mockMtab;
}
@Test
public void testContainerLimits() throws IOException {
LinuxContainerExecutor mockLCE = new MockLinuxContainerExecutor();
CustomCgroupsLCEResourceHandler handler =
new CustomCgroupsLCEResourceHandler();
handler.generateLimitsMode = true;
YarnConfiguration conf = new YarnConfiguration();
final int numProcessors = 4;
ResourceCalculatorPlugin plugin =
Mockito.mock(ResourceCalculatorPlugin.class);
Mockito.doReturn(numProcessors).when(plugin).getNumProcessors();
handler.setConf(conf);
handler.initConfig();
// create mock cgroup
File cgroupDir = createMockCgroup();
File cgroupMountDir = createMockCgroupMount(cgroupDir);
// create mock mtab
File mockMtab = createMockMTab(cgroupDir);
// setup our handler and call init()
handler.setMtabFile(mockMtab.getAbsolutePath());
handler.init(mockLCE, plugin);
// check values
// default case - files shouldn't exist, strict mode off by default
ContainerId id = ContainerId.fromString("container_1_1_1_1");
handler.preExecute(id, Resource.newInstance(1024, 1));
File containerDir = new File(cgroupMountDir, id.toString());
Assert.assertTrue(containerDir.exists());
Assert.assertTrue(containerDir.isDirectory());
File periodFile = new File(containerDir, "cpu.cfs_period_us");
File quotaFile = new File(containerDir, "cpu.cfs_quota_us");
Assert.assertFalse(periodFile.exists());
Assert.assertFalse(quotaFile.exists());
// no files created because we're using all cpu
FileUtils.deleteQuietly(containerDir);
conf.setBoolean(
YarnConfiguration.NM_LINUX_CONTAINER_CGROUPS_STRICT_RESOURCE_USAGE, true);
handler.initConfig();
handler.preExecute(id,
Resource.newInstance(1024, YarnConfiguration.DEFAULT_NM_VCORES));
Assert.assertTrue(containerDir.exists());
Assert.assertTrue(containerDir.isDirectory());
periodFile = new File(containerDir, "cpu.cfs_period_us");
quotaFile = new File(containerDir, "cpu.cfs_quota_us");
Assert.assertFalse(periodFile.exists());
Assert.assertFalse(quotaFile.exists());
// 50% of CPU
FileUtils.deleteQuietly(containerDir);
conf.setBoolean(
YarnConfiguration.NM_LINUX_CONTAINER_CGROUPS_STRICT_RESOURCE_USAGE, true);
handler.initConfig();
handler.preExecute(id,
Resource.newInstance(1024, YarnConfiguration.DEFAULT_NM_VCORES / 2));
Assert.assertTrue(containerDir.exists());
Assert.assertTrue(containerDir.isDirectory());
periodFile = new File(containerDir, "cpu.cfs_period_us");
quotaFile = new File(containerDir, "cpu.cfs_quota_us");
Assert.assertTrue(periodFile.exists());
Assert.assertTrue(quotaFile.exists());
Assert.assertEquals(500 * 1000, readIntFromFile(periodFile));
Assert.assertEquals(1000 * 1000, readIntFromFile(quotaFile));
// CGroups set to 50% of CPU, container set to 50% of YARN CPU
FileUtils.deleteQuietly(containerDir);
conf.setBoolean(
YarnConfiguration.NM_LINUX_CONTAINER_CGROUPS_STRICT_RESOURCE_USAGE, true);
conf
.setInt(YarnConfiguration.NM_RESOURCE_PERCENTAGE_PHYSICAL_CPU_LIMIT, 50);
handler.initConfig();
handler.init(mockLCE, plugin);
handler.preExecute(id,
Resource.newInstance(1024, YarnConfiguration.DEFAULT_NM_VCORES / 2));
Assert.assertTrue(containerDir.exists());
Assert.assertTrue(containerDir.isDirectory());
periodFile = new File(containerDir, "cpu.cfs_period_us");
quotaFile = new File(containerDir, "cpu.cfs_quota_us");
Assert.assertTrue(periodFile.exists());
Assert.assertTrue(quotaFile.exists());
Assert.assertEquals(1000 * 1000, readIntFromFile(periodFile));
Assert.assertEquals(1000 * 1000, readIntFromFile(quotaFile));
FileUtils.deleteQuietly(cgroupDir);
}
} }