YARN-2440. Enabled Nodemanagers to limit the aggregate cpu usage across all containers to a preconfigured limit. Contributed by Varun Vasudev.

This commit is contained in:
Vinod Kumar Vavilapalli 2014-09-10 19:22:52 -07:00
parent 83be3ad444
commit 4be95175cd
7 changed files with 408 additions and 9 deletions

View File

@ -70,6 +70,9 @@ Release 2.6.0 - UNRELEASED
YARN-415. Capture aggregate memory allocation at the app-level for chargeback. YARN-415. Capture aggregate memory allocation at the app-level for chargeback.
(Eric Payne & Andrey Klochkov via jianhe) (Eric Payne & Andrey Klochkov via jianhe)
YARN-2440. Enabled Nodemanagers to limit the aggregate cpu usage across all
containers to a preconfigured limit. (Varun Vasudev via vinodkv)
IMPROVEMENTS IMPROVEMENTS
YARN-2197. Add a link to YARN CHANGES.txt in the left side of doc YARN-2197. Add a link to YARN CHANGES.txt in the left side of doc

View File

@ -724,6 +724,12 @@ public class YarnConfiguration extends Configuration {
public static final String NM_VCORES = NM_PREFIX + "resource.cpu-vcores"; public static final String NM_VCORES = NM_PREFIX + "resource.cpu-vcores";
public static final int DEFAULT_NM_VCORES = 8; public static final int DEFAULT_NM_VCORES = 8;
/** Percentage of overall CPU which can be allocated for containers. */
public static final String NM_RESOURCE_PERCENTAGE_PHYSICAL_CPU_LIMIT =
NM_PREFIX + "resource.percentage-physical-cpu-limit";
public static final int DEFAULT_NM_RESOURCE_PERCENTAGE_PHYSICAL_CPU_LIMIT =
100;
/** NM Webapp address.**/ /** NM Webapp address.**/
public static final String NM_WEBAPP_ADDRESS = NM_PREFIX + "webapp.address"; public static final String NM_WEBAPP_ADDRESS = NM_PREFIX + "webapp.address";
public static final int DEFAULT_NM_WEBAPP_PORT = 8042; public static final int DEFAULT_NM_WEBAPP_PORT = 8042;

View File

@ -871,12 +871,24 @@
</property> </property>
<property> <property>
<description>Number of CPU cores that can be allocated <description>Number of vcores that can be allocated
for containers.</description> for containers. This is used by the RM scheduler when allocating
resources for containers. This is not used to limit the number of
physical cores used by YARN containers.</description>
<name>yarn.nodemanager.resource.cpu-vcores</name> <name>yarn.nodemanager.resource.cpu-vcores</name>
<value>8</value> <value>8</value>
</property> </property>
<property>
<description>Percentage of CPU that can be allocated
for containers. This setting allows users to limit the amount of
CPU that YARN containers use. Currently functional only
on Linux using cgroups. The default is to use 100% of CPU.
</description>
<name>yarn.nodemanager.resource.percentage-physical-cpu-limit</name>
<value>100</value>
</property>
<property> <property>
<description>NM Webapp address.</description> <description>NM Webapp address.</description>
<name>yarn.nodemanager.webapp.address</name> <name>yarn.nodemanager.webapp.address</name>

View File

@ -33,6 +33,7 @@ import java.util.regex.Matcher;
import java.util.regex.Pattern; import java.util.regex.Pattern;
import com.google.common.annotations.VisibleForTesting; import com.google.common.annotations.VisibleForTesting;
import org.apache.commons.io.FileUtils;
import org.apache.commons.logging.Log; import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory; import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
@ -42,6 +43,7 @@ import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.server.nodemanager.LinuxContainerExecutor; import org.apache.hadoop.yarn.server.nodemanager.LinuxContainerExecutor;
import org.apache.hadoop.yarn.util.Clock; import org.apache.hadoop.yarn.util.Clock;
import org.apache.hadoop.yarn.util.ResourceCalculatorPlugin;
import org.apache.hadoop.yarn.util.SystemClock; import org.apache.hadoop.yarn.util.SystemClock;
public class CgroupsLCEResourcesHandler implements LCEResourcesHandler { public class CgroupsLCEResourcesHandler implements LCEResourcesHandler {
@ -59,7 +61,11 @@ public class CgroupsLCEResourcesHandler implements LCEResourcesHandler {
private final String MTAB_FILE = "/proc/mounts"; private final String MTAB_FILE = "/proc/mounts";
private final String CGROUPS_FSTYPE = "cgroup"; private final String CGROUPS_FSTYPE = "cgroup";
private final String CONTROLLER_CPU = "cpu"; private final String CONTROLLER_CPU = "cpu";
private final String CPU_PERIOD_US = "cfs_period_us";
private final String CPU_QUOTA_US = "cfs_quota_us";
private final int CPU_DEFAULT_WEIGHT = 1024; // set by kernel private final int CPU_DEFAULT_WEIGHT = 1024; // set by kernel
private final int MAX_QUOTA_US = 1000 * 1000;
private final int MIN_PERIOD_US = 1000;
private final Map<String, String> controllerPaths; // Controller -> path private final Map<String, String> controllerPaths; // Controller -> path
private long deleteCgroupTimeout; private long deleteCgroupTimeout;
@ -106,6 +112,13 @@ public class CgroupsLCEResourcesHandler implements LCEResourcesHandler {
} }
public void init(LinuxContainerExecutor lce) throws IOException { public void init(LinuxContainerExecutor lce) throws IOException {
this.init(lce,
ResourceCalculatorPlugin.getResourceCalculatorPlugin(null, conf));
}
@VisibleForTesting
void init(LinuxContainerExecutor lce, ResourceCalculatorPlugin plugin)
throws IOException {
initConfig(); initConfig();
// mount cgroups if requested // mount cgroups if requested
@ -117,8 +130,74 @@ public class CgroupsLCEResourcesHandler implements LCEResourcesHandler {
} }
initializeControllerPaths(); initializeControllerPaths();
// cap overall usage to the number of cores allocated to YARN
float yarnProcessors =
NodeManagerHardwareUtils.getContainersCores(plugin, conf);
int systemProcessors = plugin.getNumProcessors();
if (systemProcessors != (int) yarnProcessors) {
LOG.info("YARN containers restricted to " + yarnProcessors + " cores");
int[] limits = getOverallLimits(yarnProcessors);
updateCgroup(CONTROLLER_CPU, "", CPU_PERIOD_US, String.valueOf(limits[0]));
updateCgroup(CONTROLLER_CPU, "", CPU_QUOTA_US, String.valueOf(limits[1]));
} else if (cpuLimitsExist()) {
LOG.info("Removing CPU constraints for YARN containers.");
updateCgroup(CONTROLLER_CPU, "", CPU_QUOTA_US, String.valueOf(-1));
}
} }
boolean cpuLimitsExist() throws IOException {
String path = pathForCgroup(CONTROLLER_CPU, "");
File quotaFile = new File(path, CONTROLLER_CPU + "." + CPU_QUOTA_US);
if (quotaFile.exists()) {
String contents = FileUtils.readFileToString(quotaFile, "UTF-8");
int quotaUS = Integer.parseInt(contents.trim());
if (quotaUS != -1) {
return true;
}
}
return false;
}
@VisibleForTesting
int[] getOverallLimits(float yarnProcessors) {
int[] ret = new int[2];
if (yarnProcessors < 0.01f) {
throw new IllegalArgumentException("Number of processors can't be <= 0.");
}
int quotaUS = MAX_QUOTA_US;
int periodUS = (int) (MAX_QUOTA_US / yarnProcessors);
if (yarnProcessors < 1.0f) {
periodUS = MAX_QUOTA_US;
quotaUS = (int) (periodUS * yarnProcessors);
if (quotaUS < MIN_PERIOD_US) {
LOG
.warn("The quota calculated for the cgroup was too low. The minimum value is "
+ MIN_PERIOD_US + ", calculated value is " + quotaUS
+ ". Setting quota to minimum value.");
quotaUS = MIN_PERIOD_US;
}
}
// cfs_period_us can't be less than 1000 microseconds
// if the value of periodUS is less than 1000, we can't really use cgroups
// to limit cpu
if (periodUS < MIN_PERIOD_US) {
LOG
.warn("The period calculated for the cgroup was too low. The minimum value is "
+ MIN_PERIOD_US + ", calculated value is " + periodUS
+ ". Using all available CPU.");
periodUS = MAX_QUOTA_US;
quotaUS = -1;
}
ret[0] = periodUS;
ret[1] = quotaUS;
return ret;
}
boolean isCpuWeightEnabled() { boolean isCpuWeightEnabled() {
return this.cpuWeightEnabled; return this.cpuWeightEnabled;
@ -274,7 +353,7 @@ public class CgroupsLCEResourcesHandler implements LCEResourcesHandler {
BufferedReader in = null; BufferedReader in = null;
try { try {
in = new BufferedReader(new FileReader(new File(MTAB_FILE))); in = new BufferedReader(new FileReader(new File(getMtabFileName())));
for (String str = in.readLine(); str != null; for (String str = in.readLine(); str != null;
str = in.readLine()) { str = in.readLine()) {
@ -292,13 +371,13 @@ public class CgroupsLCEResourcesHandler implements LCEResourcesHandler {
} }
} }
} catch (IOException e) { } catch (IOException e) {
throw new IOException("Error while reading " + MTAB_FILE, e); throw new IOException("Error while reading " + getMtabFileName(), e);
} finally { } finally {
// Close the streams // Close the streams
try { try {
in.close(); in.close();
} catch (IOException e2) { } catch (IOException e2) {
LOG.warn("Error closing the stream: " + MTAB_FILE, e2); LOG.warn("Error closing the stream: " + getMtabFileName(), e2);
} }
} }
@ -334,7 +413,12 @@ public class CgroupsLCEResourcesHandler implements LCEResourcesHandler {
} }
} else { } else {
throw new IOException("Not able to enforce cpu weights; cannot find " throw new IOException("Not able to enforce cpu weights; cannot find "
+ "cgroup for cpu controller in " + MTAB_FILE); + "cgroup for cpu controller in " + getMtabFileName());
} }
} }
@VisibleForTesting
String getMtabFileName() {
return MTAB_FILE;
}
} }

View File

@ -0,0 +1,79 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.server.nodemanager.util;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.util.ResourceCalculatorPlugin;
@InterfaceAudience.Private
@InterfaceStability.Unstable
public class NodeManagerHardwareUtils {
/**
*
* Returns the fraction of CPU cores that should be used for YARN containers.
* The number is derived based on various configuration params such as
* YarnConfiguration.NM_RESOURCE_PERCENTAGE_PHYSICAL_CPU_LIMIT
*
* @param conf
* - Configuration object
* @return Fraction of CPU cores to be used for YARN containers
*/
public static float getContainersCores(Configuration conf) {
ResourceCalculatorPlugin plugin =
ResourceCalculatorPlugin.getResourceCalculatorPlugin(null, conf);
return NodeManagerHardwareUtils.getContainersCores(plugin, conf);
}
/**
*
* Returns the fraction of CPU cores that should be used for YARN containers.
* The number is derived based on various configuration params such as
* YarnConfiguration.NM_RESOURCE_PERCENTAGE_PHYSICAL_CPU_LIMIT
*
* @param plugin
* - ResourceCalculatorPlugin object to determine hardware specs
* @param conf
* - Configuration object
* @return Fraction of CPU cores to be used for YARN containers
*/
public static float getContainersCores(ResourceCalculatorPlugin plugin,
Configuration conf) {
int numProcessors = plugin.getNumProcessors();
int nodeCpuPercentage =
Math.min(conf.getInt(
YarnConfiguration.NM_RESOURCE_PERCENTAGE_PHYSICAL_CPU_LIMIT,
YarnConfiguration.DEFAULT_NM_RESOURCE_PERCENTAGE_PHYSICAL_CPU_LIMIT),
100);
nodeCpuPercentage = Math.max(0, nodeCpuPercentage);
if (nodeCpuPercentage == 0) {
String message =
"Illegal value for "
+ YarnConfiguration.NM_RESOURCE_PERCENTAGE_PHYSICAL_CPU_LIMIT
+ ". Value cannot be less than or equal to 0.";
throw new IllegalArgumentException(message);
}
return (nodeCpuPercentage * numProcessors) / 100.0f;
}
}

View File

@ -17,13 +17,18 @@
*/ */
package org.apache.hadoop.yarn.server.nodemanager.util; package org.apache.hadoop.yarn.server.nodemanager.util;
import org.apache.commons.io.FileUtils;
import org.apache.hadoop.yarn.server.nodemanager.LinuxContainerExecutor;
import org.apache.hadoop.yarn.util.ResourceCalculatorPlugin;
import org.junit.Assert; import org.junit.Assert;
import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.util.Clock; import org.apache.hadoop.yarn.util.Clock;
import org.junit.Test; import org.junit.Test;
import org.mockito.Mockito;
import java.io.File; import java.io.*;
import java.io.FileOutputStream; import java.util.List;
import java.util.Scanner;
import java.util.UUID; import java.util.UUID;
import java.util.concurrent.CountDownLatch; import java.util.concurrent.CountDownLatch;
@ -70,4 +75,142 @@ public class TestCgroupsLCEResourcesHandler {
Assert.assertFalse(handler.deleteCgroup(file.getPath())); Assert.assertFalse(handler.deleteCgroup(file.getPath()));
} }
static class MockLinuxContainerExecutor extends LinuxContainerExecutor {
@Override
public void mountCgroups(List<String> x, String y) {
}
}
static class CustomCgroupsLCEResourceHandler extends
CgroupsLCEResourcesHandler {
String mtabFile;
int[] limits = new int[2];
@Override
int[] getOverallLimits(float x) {
return limits;
}
void setMtabFile(String file) {
mtabFile = file;
}
@Override
String getMtabFileName() {
return mtabFile;
}
}
@Test
public void testInit() throws IOException {
LinuxContainerExecutor mockLCE = new MockLinuxContainerExecutor();
CustomCgroupsLCEResourceHandler handler =
new CustomCgroupsLCEResourceHandler();
YarnConfiguration conf = new YarnConfiguration();
final int numProcessors = 4;
ResourceCalculatorPlugin plugin =
Mockito.mock(ResourceCalculatorPlugin.class);
Mockito.doReturn(numProcessors).when(plugin).getNumProcessors();
handler.setConf(conf);
handler.initConfig();
// create mock cgroup
File cgroupDir = new File("target", UUID.randomUUID().toString());
if (!cgroupDir.mkdir()) {
String message = "Could not create dir " + cgroupDir.getAbsolutePath();
throw new IOException(message);
}
File cgroupMountDir = new File(cgroupDir.getAbsolutePath(), "hadoop-yarn");
if (!cgroupMountDir.mkdir()) {
String message =
"Could not create dir " + cgroupMountDir.getAbsolutePath();
throw new IOException(message);
}
// create mock mtab
String mtabContent =
"none " + cgroupDir.getAbsolutePath() + " cgroup rw,relatime,cpu 0 0";
File mockMtab = new File("target", UUID.randomUUID().toString());
if (!mockMtab.exists()) {
if (!mockMtab.createNewFile()) {
String message = "Could not create file " + mockMtab.getAbsolutePath();
throw new IOException(message);
}
}
FileWriter mtabWriter = new FileWriter(mockMtab.getAbsoluteFile());
mtabWriter.write(mtabContent);
mtabWriter.close();
mockMtab.deleteOnExit();
// setup our handler and call init()
handler.setMtabFile(mockMtab.getAbsolutePath());
// check values
// in this case, we're using all cpu so the files
// shouldn't exist(because init won't create them
handler.init(mockLCE, plugin);
File periodFile = new File(cgroupMountDir, "cpu.cfs_period_us");
File quotaFile = new File(cgroupMountDir, "cpu.cfs_quota_us");
Assert.assertFalse(periodFile.exists());
Assert.assertFalse(quotaFile.exists());
// subset of cpu being used, files should be created
conf.setInt(YarnConfiguration.NM_RESOURCE_PERCENTAGE_PHYSICAL_CPU_LIMIT, 75);
handler.limits[0] = 100 * 1000;
handler.limits[1] = 1000 * 1000;
handler.init(mockLCE, plugin);
int period = readIntFromFile(periodFile);
int quota = readIntFromFile(quotaFile);
Assert.assertEquals(100 * 1000, period);
Assert.assertEquals(1000 * 1000, quota);
// set cpu back to 100, quota should be -1
conf.setInt(YarnConfiguration.NM_RESOURCE_PERCENTAGE_PHYSICAL_CPU_LIMIT, 100);
handler.limits[0] = 100 * 1000;
handler.limits[1] = 1000 * 1000;
handler.init(mockLCE, plugin);
quota = readIntFromFile(quotaFile);
Assert.assertEquals(-1, quota);
FileUtils.deleteQuietly(cgroupDir);
}
private int readIntFromFile(File targetFile) throws IOException {
Scanner scanner = new Scanner(targetFile);
if (scanner.hasNextInt()) {
return scanner.nextInt();
}
return -1;
}
@Test
public void testGetOverallLimits() {
int expectedQuota = 1000 * 1000;
CgroupsLCEResourcesHandler handler = new CgroupsLCEResourcesHandler();
int[] ret = handler.getOverallLimits(2);
Assert.assertEquals(expectedQuota / 2, ret[0]);
Assert.assertEquals(expectedQuota, ret[1]);
ret = handler.getOverallLimits(2000);
Assert.assertEquals(expectedQuota, ret[0]);
Assert.assertEquals(-1, ret[1]);
int[] params = { 0, -1 };
for (int cores : params) {
try {
handler.getOverallLimits(cores);
Assert.fail("Function call should throw error.");
} catch (IllegalArgumentException ie) {
// expected
}
}
// test minimums
ret = handler.getOverallLimits(1000 * 1000);
Assert.assertEquals(1000 * 1000, ret[0]);
Assert.assertEquals(-1, ret[1]);
}
} }

View File

@ -0,0 +1,72 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.server.nodemanager.util;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.util.ResourceCalculatorPlugin;
import org.junit.Assert;
import org.junit.Test;
import org.mockito.Mockito;
public class TestNodeManagerHardwareUtils {
@Test
public void testGetContainerCores() {
YarnConfiguration conf = new YarnConfiguration();
float ret;
final int numProcessors = 4;
ResourceCalculatorPlugin plugin =
Mockito.mock(ResourceCalculatorPlugin.class);
Mockito.doReturn(numProcessors).when(plugin).getNumProcessors();
conf.setInt(YarnConfiguration.NM_RESOURCE_PERCENTAGE_PHYSICAL_CPU_LIMIT, 0);
try {
NodeManagerHardwareUtils.getContainersCores(plugin, conf);
Assert.fail("getContainerCores should have thrown exception");
} catch (IllegalArgumentException ie) {
// expected
}
conf.setInt(YarnConfiguration.NM_RESOURCE_PERCENTAGE_PHYSICAL_CPU_LIMIT,
100);
ret = NodeManagerHardwareUtils.getContainersCores(plugin, conf);
Assert.assertEquals(4, (int) ret);
conf
.setInt(YarnConfiguration.NM_RESOURCE_PERCENTAGE_PHYSICAL_CPU_LIMIT, 50);
ret = NodeManagerHardwareUtils.getContainersCores(plugin, conf);
Assert.assertEquals(2, (int) ret);
conf
.setInt(YarnConfiguration.NM_RESOURCE_PERCENTAGE_PHYSICAL_CPU_LIMIT, 75);
ret = NodeManagerHardwareUtils.getContainersCores(plugin, conf);
Assert.assertEquals(3, (int) ret);
conf
.setInt(YarnConfiguration.NM_RESOURCE_PERCENTAGE_PHYSICAL_CPU_LIMIT, 85);
ret = NodeManagerHardwareUtils.getContainersCores(plugin, conf);
Assert.assertEquals(3.4, ret, 0.1);
conf.setInt(YarnConfiguration.NM_RESOURCE_PERCENTAGE_PHYSICAL_CPU_LIMIT,
110);
ret = NodeManagerHardwareUtils.getContainersCores(plugin, conf);
Assert.assertEquals(4, (int) ret);
}
}