diff --git a/hadoop-yarn-project/CHANGES.txt b/hadoop-yarn-project/CHANGES.txt index b28f8f823fa..6a871a58a8b 100644 --- a/hadoop-yarn-project/CHANGES.txt +++ b/hadoop-yarn-project/CHANGES.txt @@ -70,6 +70,9 @@ Release 2.6.0 - UNRELEASED YARN-415. Capture aggregate memory allocation at the app-level for chargeback. (Eric Payne & Andrey Klochkov via jianhe) + YARN-2440. Enabled Nodemanagers to limit the aggregate cpu usage across all + containers to a preconfigured limit. (Varun Vasudev via vinodkv) + IMPROVEMENTS YARN-2197. Add a link to YARN CHANGES.txt in the left side of doc diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java index 7b7511d20b2..7c71a1717f1 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java @@ -723,6 +723,12 @@ public class YarnConfiguration extends Configuration { /** Number of Virtual CPU Cores which can be allocated for containers.*/ public static final String NM_VCORES = NM_PREFIX + "resource.cpu-vcores"; public static final int DEFAULT_NM_VCORES = 8; + + /** Percentage of overall CPU which can be allocated for containers. */ + public static final String NM_RESOURCE_PERCENTAGE_PHYSICAL_CPU_LIMIT = + NM_PREFIX + "resource.percentage-physical-cpu-limit"; + public static final int DEFAULT_NM_RESOURCE_PERCENTAGE_PHYSICAL_CPU_LIMIT = + 100; /** NM Webapp address.**/ public static final String NM_WEBAPP_ADDRESS = NM_PREFIX + "webapp.address"; diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml index 9b4a90f4790..04e458cd722 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml @@ -871,12 +871,24 @@ - Number of CPU cores that can be allocated - for containers. + Number of vcores that can be allocated + for containers. This is used by the RM scheduler when allocating + resources for containers. This is not used to limit the number of + physical cores used by YARN containers. yarn.nodemanager.resource.cpu-vcores 8 + + Percentage of CPU that can be allocated + for containers. This setting allows users to limit the amount of + CPU that YARN containers use. Currently functional only + on Linux using cgroups. The default is to use 100% of CPU. + + yarn.nodemanager.resource.percentage-physical-cpu-limit + 100 + + NM Webapp address. yarn.nodemanager.webapp.address diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/util/CgroupsLCEResourcesHandler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/util/CgroupsLCEResourcesHandler.java index d5bd22540f3..0b6c2ac60b3 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/util/CgroupsLCEResourcesHandler.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/util/CgroupsLCEResourcesHandler.java @@ -33,6 +33,7 @@ import java.util.regex.Matcher; import java.util.regex.Pattern; import com.google.common.annotations.VisibleForTesting; +import org.apache.commons.io.FileUtils; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; @@ -42,6 +43,7 @@ import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.server.nodemanager.LinuxContainerExecutor; import org.apache.hadoop.yarn.util.Clock; +import org.apache.hadoop.yarn.util.ResourceCalculatorPlugin; import org.apache.hadoop.yarn.util.SystemClock; public class CgroupsLCEResourcesHandler implements LCEResourcesHandler { @@ -59,7 +61,11 @@ public class CgroupsLCEResourcesHandler implements LCEResourcesHandler { private final String MTAB_FILE = "/proc/mounts"; private final String CGROUPS_FSTYPE = "cgroup"; private final String CONTROLLER_CPU = "cpu"; + private final String CPU_PERIOD_US = "cfs_period_us"; + private final String CPU_QUOTA_US = "cfs_quota_us"; private final int CPU_DEFAULT_WEIGHT = 1024; // set by kernel + private final int MAX_QUOTA_US = 1000 * 1000; + private final int MIN_PERIOD_US = 1000; private final Map controllerPaths; // Controller -> path private long deleteCgroupTimeout; @@ -106,8 +112,15 @@ public class CgroupsLCEResourcesHandler implements LCEResourcesHandler { } public void init(LinuxContainerExecutor lce) throws IOException { + this.init(lce, + ResourceCalculatorPlugin.getResourceCalculatorPlugin(null, conf)); + } + + @VisibleForTesting + void init(LinuxContainerExecutor lce, ResourceCalculatorPlugin plugin) + throws IOException { initConfig(); - + // mount cgroups if requested if (cgroupMount && cgroupMountPath != null) { ArrayList cgroupKVs = new ArrayList(); @@ -117,8 +130,74 @@ public class CgroupsLCEResourcesHandler implements LCEResourcesHandler { } initializeControllerPaths(); + + // cap overall usage to the number of cores allocated to YARN + float yarnProcessors = + NodeManagerHardwareUtils.getContainersCores(plugin, conf); + int systemProcessors = plugin.getNumProcessors(); + if (systemProcessors != (int) yarnProcessors) { + LOG.info("YARN containers restricted to " + yarnProcessors + " cores"); + int[] limits = getOverallLimits(yarnProcessors); + updateCgroup(CONTROLLER_CPU, "", CPU_PERIOD_US, String.valueOf(limits[0])); + updateCgroup(CONTROLLER_CPU, "", CPU_QUOTA_US, String.valueOf(limits[1])); + } else if (cpuLimitsExist()) { + LOG.info("Removing CPU constraints for YARN containers."); + updateCgroup(CONTROLLER_CPU, "", CPU_QUOTA_US, String.valueOf(-1)); + } } + boolean cpuLimitsExist() throws IOException { + String path = pathForCgroup(CONTROLLER_CPU, ""); + File quotaFile = new File(path, CONTROLLER_CPU + "." + CPU_QUOTA_US); + if (quotaFile.exists()) { + String contents = FileUtils.readFileToString(quotaFile, "UTF-8"); + int quotaUS = Integer.parseInt(contents.trim()); + if (quotaUS != -1) { + return true; + } + } + return false; + } + + @VisibleForTesting + int[] getOverallLimits(float yarnProcessors) { + + int[] ret = new int[2]; + + if (yarnProcessors < 0.01f) { + throw new IllegalArgumentException("Number of processors can't be <= 0."); + } + + int quotaUS = MAX_QUOTA_US; + int periodUS = (int) (MAX_QUOTA_US / yarnProcessors); + if (yarnProcessors < 1.0f) { + periodUS = MAX_QUOTA_US; + quotaUS = (int) (periodUS * yarnProcessors); + if (quotaUS < MIN_PERIOD_US) { + LOG + .warn("The quota calculated for the cgroup was too low. The minimum value is " + + MIN_PERIOD_US + ", calculated value is " + quotaUS + + ". Setting quota to minimum value."); + quotaUS = MIN_PERIOD_US; + } + } + + // cfs_period_us can't be less than 1000 microseconds + // if the value of periodUS is less than 1000, we can't really use cgroups + // to limit cpu + if (periodUS < MIN_PERIOD_US) { + LOG + .warn("The period calculated for the cgroup was too low. The minimum value is " + + MIN_PERIOD_US + ", calculated value is " + periodUS + + ". Using all available CPU."); + periodUS = MAX_QUOTA_US; + quotaUS = -1; + } + + ret[0] = periodUS; + ret[1] = quotaUS; + return ret; + } boolean isCpuWeightEnabled() { return this.cpuWeightEnabled; @@ -274,7 +353,7 @@ public class CgroupsLCEResourcesHandler implements LCEResourcesHandler { BufferedReader in = null; try { - in = new BufferedReader(new FileReader(new File(MTAB_FILE))); + in = new BufferedReader(new FileReader(new File(getMtabFileName()))); for (String str = in.readLine(); str != null; str = in.readLine()) { @@ -292,13 +371,13 @@ public class CgroupsLCEResourcesHandler implements LCEResourcesHandler { } } } catch (IOException e) { - throw new IOException("Error while reading " + MTAB_FILE, e); + throw new IOException("Error while reading " + getMtabFileName(), e); } finally { // Close the streams try { in.close(); } catch (IOException e2) { - LOG.warn("Error closing the stream: " + MTAB_FILE, e2); + LOG.warn("Error closing the stream: " + getMtabFileName(), e2); } } @@ -334,7 +413,12 @@ public class CgroupsLCEResourcesHandler implements LCEResourcesHandler { } } else { throw new IOException("Not able to enforce cpu weights; cannot find " - + "cgroup for cpu controller in " + MTAB_FILE); + + "cgroup for cpu controller in " + getMtabFileName()); } } + + @VisibleForTesting + String getMtabFileName() { + return MTAB_FILE; + } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/util/NodeManagerHardwareUtils.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/util/NodeManagerHardwareUtils.java new file mode 100644 index 00000000000..07cf698429c --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/util/NodeManagerHardwareUtils.java @@ -0,0 +1,79 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.nodemanager.util; + +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.classification.InterfaceStability; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.yarn.conf.YarnConfiguration; +import org.apache.hadoop.yarn.util.ResourceCalculatorPlugin; + +@InterfaceAudience.Private +@InterfaceStability.Unstable +public class NodeManagerHardwareUtils { + + /** + * + * Returns the fraction of CPU cores that should be used for YARN containers. + * The number is derived based on various configuration params such as + * YarnConfiguration.NM_RESOURCE_PERCENTAGE_PHYSICAL_CPU_LIMIT + * + * @param conf + * - Configuration object + * @return Fraction of CPU cores to be used for YARN containers + */ + public static float getContainersCores(Configuration conf) { + ResourceCalculatorPlugin plugin = + ResourceCalculatorPlugin.getResourceCalculatorPlugin(null, conf); + return NodeManagerHardwareUtils.getContainersCores(plugin, conf); + } + + /** + * + * Returns the fraction of CPU cores that should be used for YARN containers. + * The number is derived based on various configuration params such as + * YarnConfiguration.NM_RESOURCE_PERCENTAGE_PHYSICAL_CPU_LIMIT + * + * @param plugin + * - ResourceCalculatorPlugin object to determine hardware specs + * @param conf + * - Configuration object + * @return Fraction of CPU cores to be used for YARN containers + */ + public static float getContainersCores(ResourceCalculatorPlugin plugin, + Configuration conf) { + int numProcessors = plugin.getNumProcessors(); + int nodeCpuPercentage = + Math.min(conf.getInt( + YarnConfiguration.NM_RESOURCE_PERCENTAGE_PHYSICAL_CPU_LIMIT, + YarnConfiguration.DEFAULT_NM_RESOURCE_PERCENTAGE_PHYSICAL_CPU_LIMIT), + 100); + nodeCpuPercentage = Math.max(0, nodeCpuPercentage); + + if (nodeCpuPercentage == 0) { + String message = + "Illegal value for " + + YarnConfiguration.NM_RESOURCE_PERCENTAGE_PHYSICAL_CPU_LIMIT + + ". Value cannot be less than or equal to 0."; + throw new IllegalArgumentException(message); + } + + return (nodeCpuPercentage * numProcessors) / 100.0f; + } +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/util/TestCgroupsLCEResourcesHandler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/util/TestCgroupsLCEResourcesHandler.java index 611045ea2d9..45068988045 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/util/TestCgroupsLCEResourcesHandler.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/util/TestCgroupsLCEResourcesHandler.java @@ -17,13 +17,18 @@ */ package org.apache.hadoop.yarn.server.nodemanager.util; +import org.apache.commons.io.FileUtils; +import org.apache.hadoop.yarn.server.nodemanager.LinuxContainerExecutor; +import org.apache.hadoop.yarn.util.ResourceCalculatorPlugin; import org.junit.Assert; import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.util.Clock; import org.junit.Test; +import org.mockito.Mockito; -import java.io.File; -import java.io.FileOutputStream; +import java.io.*; +import java.util.List; +import java.util.Scanner; import java.util.UUID; import java.util.concurrent.CountDownLatch; @@ -70,4 +75,142 @@ public class TestCgroupsLCEResourcesHandler { Assert.assertFalse(handler.deleteCgroup(file.getPath())); } + static class MockLinuxContainerExecutor extends LinuxContainerExecutor { + @Override + public void mountCgroups(List x, String y) { + } + } + + static class CustomCgroupsLCEResourceHandler extends + CgroupsLCEResourcesHandler { + + String mtabFile; + int[] limits = new int[2]; + + @Override + int[] getOverallLimits(float x) { + return limits; + } + + void setMtabFile(String file) { + mtabFile = file; + } + + @Override + String getMtabFileName() { + return mtabFile; + } + } + + @Test + public void testInit() throws IOException { + LinuxContainerExecutor mockLCE = new MockLinuxContainerExecutor(); + CustomCgroupsLCEResourceHandler handler = + new CustomCgroupsLCEResourceHandler(); + YarnConfiguration conf = new YarnConfiguration(); + final int numProcessors = 4; + ResourceCalculatorPlugin plugin = + Mockito.mock(ResourceCalculatorPlugin.class); + Mockito.doReturn(numProcessors).when(plugin).getNumProcessors(); + handler.setConf(conf); + handler.initConfig(); + + // create mock cgroup + File cgroupDir = new File("target", UUID.randomUUID().toString()); + if (!cgroupDir.mkdir()) { + String message = "Could not create dir " + cgroupDir.getAbsolutePath(); + throw new IOException(message); + } + File cgroupMountDir = new File(cgroupDir.getAbsolutePath(), "hadoop-yarn"); + if (!cgroupMountDir.mkdir()) { + String message = + "Could not create dir " + cgroupMountDir.getAbsolutePath(); + throw new IOException(message); + } + + // create mock mtab + String mtabContent = + "none " + cgroupDir.getAbsolutePath() + " cgroup rw,relatime,cpu 0 0"; + File mockMtab = new File("target", UUID.randomUUID().toString()); + if (!mockMtab.exists()) { + if (!mockMtab.createNewFile()) { + String message = "Could not create file " + mockMtab.getAbsolutePath(); + throw new IOException(message); + } + } + FileWriter mtabWriter = new FileWriter(mockMtab.getAbsoluteFile()); + mtabWriter.write(mtabContent); + mtabWriter.close(); + mockMtab.deleteOnExit(); + + // setup our handler and call init() + handler.setMtabFile(mockMtab.getAbsolutePath()); + + // check values + // in this case, we're using all cpu so the files + // shouldn't exist(because init won't create them + handler.init(mockLCE, plugin); + File periodFile = new File(cgroupMountDir, "cpu.cfs_period_us"); + File quotaFile = new File(cgroupMountDir, "cpu.cfs_quota_us"); + Assert.assertFalse(periodFile.exists()); + Assert.assertFalse(quotaFile.exists()); + + // subset of cpu being used, files should be created + conf.setInt(YarnConfiguration.NM_RESOURCE_PERCENTAGE_PHYSICAL_CPU_LIMIT, 75); + handler.limits[0] = 100 * 1000; + handler.limits[1] = 1000 * 1000; + handler.init(mockLCE, plugin); + int period = readIntFromFile(periodFile); + int quota = readIntFromFile(quotaFile); + Assert.assertEquals(100 * 1000, period); + Assert.assertEquals(1000 * 1000, quota); + + // set cpu back to 100, quota should be -1 + conf.setInt(YarnConfiguration.NM_RESOURCE_PERCENTAGE_PHYSICAL_CPU_LIMIT, 100); + handler.limits[0] = 100 * 1000; + handler.limits[1] = 1000 * 1000; + handler.init(mockLCE, plugin); + quota = readIntFromFile(quotaFile); + Assert.assertEquals(-1, quota); + + FileUtils.deleteQuietly(cgroupDir); + } + + private int readIntFromFile(File targetFile) throws IOException { + Scanner scanner = new Scanner(targetFile); + if (scanner.hasNextInt()) { + return scanner.nextInt(); + } + return -1; + } + + @Test + public void testGetOverallLimits() { + + int expectedQuota = 1000 * 1000; + CgroupsLCEResourcesHandler handler = new CgroupsLCEResourcesHandler(); + + int[] ret = handler.getOverallLimits(2); + Assert.assertEquals(expectedQuota / 2, ret[0]); + Assert.assertEquals(expectedQuota, ret[1]); + + ret = handler.getOverallLimits(2000); + Assert.assertEquals(expectedQuota, ret[0]); + Assert.assertEquals(-1, ret[1]); + + int[] params = { 0, -1 }; + for (int cores : params) { + try { + handler.getOverallLimits(cores); + Assert.fail("Function call should throw error."); + } catch (IllegalArgumentException ie) { + // expected + } + } + + // test minimums + ret = handler.getOverallLimits(1000 * 1000); + Assert.assertEquals(1000 * 1000, ret[0]); + Assert.assertEquals(-1, ret[1]); + } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/util/TestNodeManagerHardwareUtils.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/util/TestNodeManagerHardwareUtils.java new file mode 100644 index 00000000000..e1af9483a89 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/util/TestNodeManagerHardwareUtils.java @@ -0,0 +1,72 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.nodemanager.util; + +import org.apache.hadoop.yarn.conf.YarnConfiguration; +import org.apache.hadoop.yarn.util.ResourceCalculatorPlugin; +import org.junit.Assert; +import org.junit.Test; +import org.mockito.Mockito; + +public class TestNodeManagerHardwareUtils { + + @Test + public void testGetContainerCores() { + + YarnConfiguration conf = new YarnConfiguration(); + float ret; + final int numProcessors = 4; + ResourceCalculatorPlugin plugin = + Mockito.mock(ResourceCalculatorPlugin.class); + Mockito.doReturn(numProcessors).when(plugin).getNumProcessors(); + + conf.setInt(YarnConfiguration.NM_RESOURCE_PERCENTAGE_PHYSICAL_CPU_LIMIT, 0); + try { + NodeManagerHardwareUtils.getContainersCores(plugin, conf); + Assert.fail("getContainerCores should have thrown exception"); + } catch (IllegalArgumentException ie) { + // expected + } + + conf.setInt(YarnConfiguration.NM_RESOURCE_PERCENTAGE_PHYSICAL_CPU_LIMIT, + 100); + ret = NodeManagerHardwareUtils.getContainersCores(plugin, conf); + Assert.assertEquals(4, (int) ret); + + conf + .setInt(YarnConfiguration.NM_RESOURCE_PERCENTAGE_PHYSICAL_CPU_LIMIT, 50); + ret = NodeManagerHardwareUtils.getContainersCores(plugin, conf); + Assert.assertEquals(2, (int) ret); + + conf + .setInt(YarnConfiguration.NM_RESOURCE_PERCENTAGE_PHYSICAL_CPU_LIMIT, 75); + ret = NodeManagerHardwareUtils.getContainersCores(plugin, conf); + Assert.assertEquals(3, (int) ret); + + conf + .setInt(YarnConfiguration.NM_RESOURCE_PERCENTAGE_PHYSICAL_CPU_LIMIT, 85); + ret = NodeManagerHardwareUtils.getContainersCores(plugin, conf); + Assert.assertEquals(3.4, ret, 0.1); + + conf.setInt(YarnConfiguration.NM_RESOURCE_PERCENTAGE_PHYSICAL_CPU_LIMIT, + 110); + ret = NodeManagerHardwareUtils.getContainersCores(plugin, conf); + Assert.assertEquals(4, (int) ret); + } +}