YARN-160. Enhanced NodeManager to automatically obtain cpu/memory values from underlying OS when configured to do so. Contributed by Varun Vasudev.

(cherry picked from commit 500a1d9c76)
This commit is contained in:
Vinod Kumar Vavilapalli 2015-05-26 11:38:35 -07:00
parent e694c3339e
commit a67cb4826b
16 changed files with 696 additions and 107 deletions

View File

@ -88,6 +88,12 @@ public class DummyResourceCalculatorPlugin extends ResourceCalculatorPlugin {
return getConf().getInt(NUM_PROCESSORS, -1); return getConf().getInt(NUM_PROCESSORS, -1);
} }
/** {@inheritDoc} */
@Override
public int getNumCores() {
return getNumProcessors();
}
/** {@inheritDoc} */ /** {@inheritDoc} */
@Override @Override
public long getCpuFrequency() { public long getCpuFrequency() {

View File

@ -72,6 +72,9 @@ Release 2.8.0 - UNRELEASED
YARN-3541. Add version info on timeline service / generic history web UI YARN-3541. Add version info on timeline service / generic history web UI
and REST API. (Zhijie Shen via xgong) and REST API. (Zhijie Shen via xgong)
YARN-160. Enhanced NodeManager to automatically obtain cpu/memory values from
underlying OS when configured to do so. (Varun Vasudev via vinodkv)
IMPROVEMENTS IMPROVEMENTS
YARN-644. Basic null check is not performed on passed in arguments before YARN-644. Basic null check is not performed on passed in arguments before

View File

@ -804,10 +804,14 @@ public class YarnConfiguration extends Configuration {
public static final String YARN_TRACKING_URL_GENERATOR = public static final String YARN_TRACKING_URL_GENERATOR =
YARN_PREFIX + "tracking.url.generator"; YARN_PREFIX + "tracking.url.generator";
/** Amount of memory in GB that can be allocated for containers.*/ /** Amount of memory in MB that can be allocated for containers.*/
public static final String NM_PMEM_MB = NM_PREFIX + "resource.memory-mb"; public static final String NM_PMEM_MB = NM_PREFIX + "resource.memory-mb";
public static final int DEFAULT_NM_PMEM_MB = 8 * 1024; public static final int DEFAULT_NM_PMEM_MB = 8 * 1024;
/** Amount of memory in MB that has been reserved for non-yarn use. */
public static final String NM_SYSTEM_RESERVED_PMEM_MB = NM_PREFIX
+ "resource.system-reserved-memory-mb";
/** Specifies whether physical memory check is enabled. */ /** Specifies whether physical memory check is enabled. */
public static final String NM_PMEM_CHECK_ENABLED = NM_PREFIX public static final String NM_PMEM_CHECK_ENABLED = NM_PREFIX
+ "pmem-check-enabled"; + "pmem-check-enabled";
@ -827,12 +831,29 @@ public class YarnConfiguration extends Configuration {
public static final String NM_VCORES = NM_PREFIX + "resource.cpu-vcores"; public static final String NM_VCORES = NM_PREFIX + "resource.cpu-vcores";
public static final int DEFAULT_NM_VCORES = 8; public static final int DEFAULT_NM_VCORES = 8;
/** Count logical processors(like hyperthreads) as cores. */
public static final String NM_COUNT_LOGICAL_PROCESSORS_AS_CORES = NM_PREFIX
+ "resource.count-logical-processors-as-cores";
public static final boolean DEFAULT_NM_COUNT_LOGICAL_PROCESSORS_AS_CORES =
false;
/** Multiplier to convert physical cores to vcores. */
public static final String NM_PCORES_VCORES_MULTIPLIER = NM_PREFIX
+ "resource.pcores-vcores-multiplier";
public static final float DEFAULT_NM_PCORES_VCORES_MULTIPLIER = 1.0f;
/** Percentage of overall CPU which can be allocated for containers. */ /** Percentage of overall CPU which can be allocated for containers. */
public static final String NM_RESOURCE_PERCENTAGE_PHYSICAL_CPU_LIMIT = public static final String NM_RESOURCE_PERCENTAGE_PHYSICAL_CPU_LIMIT =
NM_PREFIX + "resource.percentage-physical-cpu-limit"; NM_PREFIX + "resource.percentage-physical-cpu-limit";
public static final int DEFAULT_NM_RESOURCE_PERCENTAGE_PHYSICAL_CPU_LIMIT = public static final int DEFAULT_NM_RESOURCE_PERCENTAGE_PHYSICAL_CPU_LIMIT =
100; 100;
/** Enable or disable node hardware capability detection. */
public static final String NM_ENABLE_HARDWARE_CAPABILITY_DETECTION =
NM_PREFIX + "resource.detect-hardware-capabilities";
public static final boolean DEFAULT_NM_ENABLE_HARDWARE_CAPABILITY_DETECTION =
false;
/** /**
* Prefix for disk configurations. Work in progress: This configuration * Prefix for disk configurations. Work in progress: This configuration
* parameter may be changed/removed in the future. * parameter may be changed/removed in the future.

View File

@ -25,9 +25,11 @@ import java.io.InputStreamReader;
import java.io.IOException; import java.io.IOException;
import java.math.BigInteger; import java.math.BigInteger;
import java.nio.charset.Charset; import java.nio.charset.Charset;
import java.util.HashSet;
import java.util.regex.Matcher; import java.util.regex.Matcher;
import java.util.regex.Pattern; import java.util.regex.Pattern;
import com.google.common.annotations.VisibleForTesting;
import org.apache.commons.logging.Log; import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory; import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceAudience;
@ -58,16 +60,20 @@ public class LinuxResourceCalculatorPlugin extends ResourceCalculatorPlugin {
private static final String INACTIVE_STRING = "Inactive"; private static final String INACTIVE_STRING = "Inactive";
/** /**
* Patterns for parsing /proc/cpuinfo * Patterns for parsing /proc/cpuinfo.
*/ */
private static final String PROCFS_CPUINFO = "/proc/cpuinfo"; private static final String PROCFS_CPUINFO = "/proc/cpuinfo";
private static final Pattern PROCESSOR_FORMAT = private static final Pattern PROCESSOR_FORMAT =
Pattern.compile("^processor[ \t]:[ \t]*([0-9]*)"); Pattern.compile("^processor[ \t]:[ \t]*([0-9]*)");
private static final Pattern FREQUENCY_FORMAT = private static final Pattern FREQUENCY_FORMAT =
Pattern.compile("^cpu MHz[ \t]*:[ \t]*([0-9.]*)"); Pattern.compile("^cpu MHz[ \t]*:[ \t]*([0-9.]*)");
private static final Pattern PHYSICAL_ID_FORMAT =
Pattern.compile("^physical id[ \t]*:[ \t]*([0-9]*)");
private static final Pattern CORE_ID_FORMAT =
Pattern.compile("^core id[ \t]*:[ \t]*([0-9]*)");
/** /**
* Pattern for parsing /proc/stat * Pattern for parsing /proc/stat.
*/ */
private static final String PROCFS_STAT = "/proc/stat"; private static final String PROCFS_STAT = "/proc/stat";
private static final Pattern CPU_TIME_FORMAT = private static final Pattern CPU_TIME_FORMAT =
@ -78,21 +84,24 @@ public class LinuxResourceCalculatorPlugin extends ResourceCalculatorPlugin {
private String procfsMemFile; private String procfsMemFile;
private String procfsCpuFile; private String procfsCpuFile;
private String procfsStatFile; private String procfsStatFile;
long jiffyLengthInMillis; private long jiffyLengthInMillis;
private long ramSize = 0; private long ramSize = 0;
private long swapSize = 0; private long swapSize = 0;
private long ramSizeFree = 0; // free ram space on the machine (kB) private long ramSizeFree = 0; // free ram space on the machine (kB)
private long swapSizeFree = 0; // free swap space on the machine (kB) private long swapSizeFree = 0; // free swap space on the machine (kB)
private long inactiveSize = 0; // inactive cache memory (kB) private long inactiveSize = 0; // inactive cache memory (kB)
private int numProcessors = 0; // number of processors on the system /* number of logical processors on the system. */
private int numProcessors = 0;
/* number of physical cores on the system. */
private int numCores = 0;
private long cpuFrequency = 0L; // CPU frequency on the system (kHz) private long cpuFrequency = 0L; // CPU frequency on the system (kHz)
boolean readMemInfoFile = false; private boolean readMemInfoFile = false;
boolean readCpuInfoFile = false; private boolean readCpuInfoFile = false;
/** /**
* Get current time * Get current time.
* @return Unix time stamp in millisecond * @return Unix time stamp in millisecond
*/ */
long getCurrentTime() { long getCurrentTime() {
@ -106,7 +115,7 @@ public class LinuxResourceCalculatorPlugin extends ResourceCalculatorPlugin {
/** /**
* Constructor which allows assigning the /proc/ directories. This will be * Constructor which allows assigning the /proc/ directories. This will be
* used only in unit tests * used only in unit tests.
* @param procfsMemFile fake file for /proc/meminfo * @param procfsMemFile fake file for /proc/meminfo
* @param procfsCpuFile fake file for /proc/cpuinfo * @param procfsCpuFile fake file for /proc/cpuinfo
* @param procfsStatFile fake file for /proc/stat * @param procfsStatFile fake file for /proc/stat
@ -124,14 +133,14 @@ public class LinuxResourceCalculatorPlugin extends ResourceCalculatorPlugin {
} }
/** /**
* Read /proc/meminfo, parse and compute memory information only once * Read /proc/meminfo, parse and compute memory information only once.
*/ */
private void readProcMemInfoFile() { private void readProcMemInfoFile() {
readProcMemInfoFile(false); readProcMemInfoFile(false);
} }
/** /**
* Read /proc/meminfo, parse and compute memory information * Read /proc/meminfo, parse and compute memory information.
* @param readAgain if false, read only on the first time * @param readAgain if false, read only on the first time
*/ */
private void readProcMemInfoFile(boolean readAgain) { private void readProcMemInfoFile(boolean readAgain) {
@ -141,18 +150,20 @@ public class LinuxResourceCalculatorPlugin extends ResourceCalculatorPlugin {
} }
// Read "/proc/memInfo" file // Read "/proc/memInfo" file
BufferedReader in = null; BufferedReader in;
InputStreamReader fReader = null; InputStreamReader fReader;
try { try {
fReader = new InputStreamReader( fReader = new InputStreamReader(
new FileInputStream(procfsMemFile), Charset.forName("UTF-8")); new FileInputStream(procfsMemFile), Charset.forName("UTF-8"));
in = new BufferedReader(fReader); in = new BufferedReader(fReader);
} catch (FileNotFoundException f) { } catch (FileNotFoundException f) {
// shouldn't happen.... // shouldn't happen....
LOG.warn("Couldn't read " + procfsMemFile
+ "; can't determine memory settings");
return; return;
} }
Matcher mat = null; Matcher mat;
try { try {
String str = in.readLine(); String str = in.readLine();
@ -193,27 +204,31 @@ public class LinuxResourceCalculatorPlugin extends ResourceCalculatorPlugin {
} }
/** /**
* Read /proc/cpuinfo, parse and calculate CPU information * Read /proc/cpuinfo, parse and calculate CPU information.
*/ */
private void readProcCpuInfoFile() { private void readProcCpuInfoFile() {
// This directory needs to be read only once // This directory needs to be read only once
if (readCpuInfoFile) { if (readCpuInfoFile) {
return; return;
} }
HashSet<String> coreIdSet = new HashSet<>();
// Read "/proc/cpuinfo" file // Read "/proc/cpuinfo" file
BufferedReader in = null; BufferedReader in;
InputStreamReader fReader = null; InputStreamReader fReader;
try { try {
fReader = new InputStreamReader( fReader = new InputStreamReader(
new FileInputStream(procfsCpuFile), Charset.forName("UTF-8")); new FileInputStream(procfsCpuFile), Charset.forName("UTF-8"));
in = new BufferedReader(fReader); in = new BufferedReader(fReader);
} catch (FileNotFoundException f) { } catch (FileNotFoundException f) {
// shouldn't happen.... // shouldn't happen....
LOG.warn("Couldn't read " + procfsCpuFile + "; can't determine cpu info");
return; return;
} }
Matcher mat = null; Matcher mat;
try { try {
numProcessors = 0; numProcessors = 0;
numCores = 1;
String currentPhysicalId = "";
String str = in.readLine(); String str = in.readLine();
while (str != null) { while (str != null) {
mat = PROCESSOR_FORMAT.matcher(str); mat = PROCESSOR_FORMAT.matcher(str);
@ -224,6 +239,15 @@ public class LinuxResourceCalculatorPlugin extends ResourceCalculatorPlugin {
if (mat.find()) { if (mat.find()) {
cpuFrequency = (long)(Double.parseDouble(mat.group(1)) * 1000); // kHz cpuFrequency = (long)(Double.parseDouble(mat.group(1)) * 1000); // kHz
} }
mat = PHYSICAL_ID_FORMAT.matcher(str);
if (mat.find()) {
currentPhysicalId = str;
}
mat = CORE_ID_FORMAT.matcher(str);
if (mat.find()) {
coreIdSet.add(currentPhysicalId + " " + str);
numCores = coreIdSet.size();
}
str = in.readLine(); str = in.readLine();
} }
} catch (IOException io) { } catch (IOException io) {
@ -245,12 +269,12 @@ public class LinuxResourceCalculatorPlugin extends ResourceCalculatorPlugin {
} }
/** /**
* Read /proc/stat file, parse and calculate cumulative CPU * Read /proc/stat file, parse and calculate cumulative CPU.
*/ */
private void readProcStatFile() { private void readProcStatFile() {
// Read "/proc/stat" file // Read "/proc/stat" file
BufferedReader in = null; BufferedReader in;
InputStreamReader fReader = null; InputStreamReader fReader;
try { try {
fReader = new InputStreamReader( fReader = new InputStreamReader(
new FileInputStream(procfsStatFile), Charset.forName("UTF-8")); new FileInputStream(procfsStatFile), Charset.forName("UTF-8"));
@ -260,7 +284,7 @@ public class LinuxResourceCalculatorPlugin extends ResourceCalculatorPlugin {
return; return;
} }
Matcher mat = null; Matcher mat;
try { try {
String str = in.readLine(); String str = in.readLine();
while (str != null) { while (str != null) {
@ -328,6 +352,13 @@ public class LinuxResourceCalculatorPlugin extends ResourceCalculatorPlugin {
return numProcessors; return numProcessors;
} }
/** {@inheritDoc} */
@Override
public int getNumCores() {
readProcCpuInfoFile();
return numCores;
}
/** {@inheritDoc} */ /** {@inheritDoc} */
@Override @Override
public long getCpuFrequency() { public long getCpuFrequency() {
@ -354,9 +385,9 @@ public class LinuxResourceCalculatorPlugin extends ResourceCalculatorPlugin {
} }
/** /**
* Test the {@link LinuxResourceCalculatorPlugin} * Test the {@link LinuxResourceCalculatorPlugin}.
* *
* @param args * @param args - arguments to this calculator test
*/ */
public static void main(String[] args) { public static void main(String[] args) {
LinuxResourceCalculatorPlugin plugin = new LinuxResourceCalculatorPlugin(); LinuxResourceCalculatorPlugin plugin = new LinuxResourceCalculatorPlugin();
@ -380,4 +411,13 @@ public class LinuxResourceCalculatorPlugin extends ResourceCalculatorPlugin {
} }
System.out.println("CPU usage % : " + plugin.getCpuUsage()); System.out.println("CPU usage % : " + plugin.getCpuUsage());
} }
@VisibleForTesting
void setReadCpuInfoFile(boolean readCpuInfoFileValue) {
this.readCpuInfoFile = readCpuInfoFileValue;
}
public long getJiffyLengthInMillis() {
return this.jiffyLengthInMillis;
}
} }

View File

@ -64,12 +64,19 @@ public abstract class ResourceCalculatorPlugin extends Configured {
public abstract long getAvailablePhysicalMemorySize(); public abstract long getAvailablePhysicalMemorySize();
/** /**
* Obtain the total number of processors present on the system. * Obtain the total number of logical processors present on the system.
* *
* @return number of processors * @return number of logical processors
*/ */
public abstract int getNumProcessors(); public abstract int getNumProcessors();
/**
* Obtain total number of physical cores present on the system.
*
* @return number of physical cores
*/
public abstract int getNumCores();
/** /**
* Obtain the CPU frequency of on the system. * Obtain the CPU frequency of on the system.
* *

View File

@ -147,6 +147,12 @@ public class WindowsResourceCalculatorPlugin extends ResourceCalculatorPlugin {
return numProcessors; return numProcessors;
} }
/** {@inheritDoc} */
@Override
public int getNumCores() {
return getNumProcessors();
}
/** {@inheritDoc} */ /** {@inheritDoc} */
@Override @Override
public long getCpuFrequency() { public long getCpuFrequency() {

View File

@ -890,9 +890,25 @@
<property> <property>
<description>Amount of physical memory, in MB, that can be allocated <description>Amount of physical memory, in MB, that can be allocated
for containers.</description> for containers. If set to -1 and
yarn.nodemanager.resource.detect-hardware-capabilities is true, it is
automatically calculated(in case of Windows and Linux).
In other cases, the default is 8192MB.
</description>
<name>yarn.nodemanager.resource.memory-mb</name> <name>yarn.nodemanager.resource.memory-mb</name>
<value>8192</value> <value>-1</value>
</property>
<property>
<description>Amount of physical memory, in MB, that is reserved
for non-YARN processes. This configuration is only used if
yarn.nodemanager.resource.detect-hardware-capabilities is set
to true and yarn.nodemanager.resource.memory-mb is -1. If set
to -1, this amount is calculated as
20% of (system memory - 2*HADOOP_HEAPSIZE)
</description>
<name>yarn.nodemanager.resource.system-reserved-memory-mb</name>
<value>-1</value>
</property> </property>
<property> <property>
@ -923,9 +939,34 @@
<description>Number of vcores that can be allocated <description>Number of vcores that can be allocated
for containers. This is used by the RM scheduler when allocating for containers. This is used by the RM scheduler when allocating
resources for containers. This is not used to limit the number of resources for containers. This is not used to limit the number of
physical cores used by YARN containers.</description> CPUs used by YARN containers. If it is set to -1 and
yarn.nodemanager.resource.detect-hardware-capabilities is true, it is
automatically determined from the hardware in case of Windows and Linux.
In other cases, number of vcores is 8 by default.</description>
<name>yarn.nodemanager.resource.cpu-vcores</name> <name>yarn.nodemanager.resource.cpu-vcores</name>
<value>8</value> <value>-1</value>
</property>
<property>
<description>Flag to determine if logical processors(such as
hyperthreads) should be counted as cores. Only applicable on Linux
when yarn.nodemanager.resource.cpu-vcores is set to -1 and
yarn.nodemanager.resource.detect-hardware-capabilities is true.
</description>
<name>yarn.nodemanager.resource.count-logical-processors-as-cores</name>
<value>false</value>
</property>
<property>
<description>Multiplier to determine how to convert phyiscal cores to
vcores. This value is used if yarn.nodemanager.resource.cpu-vcores
is set to -1(which implies auto-calculate vcores) and
yarn.nodemanager.resource.detect-hardware-capabilities is set to true. The
number of vcores will be calculated as
number of CPUs * multiplier.
</description>
<name>yarn.nodemanager.resource.pcores-vcores-multiplier</name>
<value>1.0</value>
</property> </property>
<property> <property>
@ -938,6 +979,14 @@
<value>100</value> <value>100</value>
</property> </property>
<property>
<description>Enable auto-detection of node capabilities such as
memory and CPU.
</description>
<name>yarn.nodemanager.resource.detect-hardware-capabilities</name>
<value>false</value>
</property>
<property> <property>
<description>NM Webapp address.</description> <description>NM Webapp address.</description>
<name>yarn.nodemanager.webapp.address</name> <name>yarn.nodemanager.webapp.address</name>

View File

@ -21,11 +21,13 @@ package org.apache.hadoop.yarn.util;
import java.io.File; import java.io.File;
import java.io.FileWriter; import java.io.FileWriter;
import java.io.IOException; import java.io.IOException;
import java.util.ArrayList;
import java.util.Random; import java.util.Random;
import org.apache.commons.io.IOUtils;
import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.Path;
import org.junit.Test; import org.junit.Test;
import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertEquals;
/** /**
@ -51,7 +53,7 @@ public class TestLinuxResourceCalculatorPlugin {
return currentTime; return currentTime;
} }
public void advanceTime(long adv) { public void advanceTime(long adv) {
currentTime += adv * jiffyLengthInMillis; currentTime += adv * this.getJiffyLengthInMillis();
} }
} }
private static final FakeLinuxResourceCalculatorPlugin plugin; private static final FakeLinuxResourceCalculatorPlugin plugin;
@ -109,9 +111,9 @@ public class TestLinuxResourceCalculatorPlugin {
"stepping : 2\n" + "stepping : 2\n" +
"cpu MHz : %f\n" + "cpu MHz : %f\n" +
"cache size : 1024 KB\n" + "cache size : 1024 KB\n" +
"physical id : 0\n" + "physical id : %s\n" +
"siblings : 2\n" + "siblings : 2\n" +
"core id : 0\n" + "core id : %s\n" +
"cpu cores : 2\n" + "cpu cores : 2\n" +
"fpu : yes\n" + "fpu : yes\n" +
"fpu_exception : yes\n" + "fpu_exception : yes\n" +
@ -151,8 +153,9 @@ public class TestLinuxResourceCalculatorPlugin {
long cpuFrequencyKHz = 2392781; long cpuFrequencyKHz = 2392781;
String fileContent = ""; String fileContent = "";
for (int i = 0; i < numProcessors; i++) { for (int i = 0; i < numProcessors; i++) {
fileContent += String.format(CPUINFO_FORMAT, i, cpuFrequencyKHz / 1000D) + fileContent +=
"\n"; String.format(CPUINFO_FORMAT, i, cpuFrequencyKHz / 1000D, 0, 0)
+ "\n";
} }
File tempFile = new File(FAKE_CPUFILE); File tempFile = new File(FAKE_CPUFILE);
tempFile.deleteOnExit(); tempFile.deleteOnExit();
@ -232,4 +235,90 @@ public class TestLinuxResourceCalculatorPlugin {
assertEquals(plugin.getPhysicalMemorySize(), 1024L * memTotal); assertEquals(plugin.getPhysicalMemorySize(), 1024L * memTotal);
assertEquals(plugin.getVirtualMemorySize(), 1024L * (memTotal + swapTotal)); assertEquals(plugin.getVirtualMemorySize(), 1024L * (memTotal + swapTotal));
} }
@Test
public void testCoreCounts() throws IOException {
String fileContent = "";
// single core, hyper threading
long numProcessors = 2;
long cpuFrequencyKHz = 2392781;
for (int i = 0; i < numProcessors; i++) {
fileContent =
fileContent.concat(String.format(CPUINFO_FORMAT, i,
cpuFrequencyKHz / 1000D, 0, 0));
fileContent = fileContent.concat("\n");
}
writeFakeCPUInfoFile(fileContent);
plugin.setReadCpuInfoFile(false);
assertEquals(numProcessors, plugin.getNumProcessors());
assertEquals(1, plugin.getNumCores());
// single socket quad core, no hyper threading
fileContent = "";
numProcessors = 4;
for (int i = 0; i < numProcessors; i++) {
fileContent =
fileContent.concat(String.format(CPUINFO_FORMAT, i,
cpuFrequencyKHz / 1000D, 0, i));
fileContent = fileContent.concat("\n");
}
writeFakeCPUInfoFile(fileContent);
plugin.setReadCpuInfoFile(false);
assertEquals(numProcessors, plugin.getNumProcessors());
assertEquals(4, plugin.getNumCores());
// dual socket single core, hyper threading
fileContent = "";
numProcessors = 4;
for (int i = 0; i < numProcessors; i++) {
fileContent =
fileContent.concat(String.format(CPUINFO_FORMAT, i,
cpuFrequencyKHz / 1000D, i / 2, 0));
fileContent = fileContent.concat("\n");
}
writeFakeCPUInfoFile(fileContent);
plugin.setReadCpuInfoFile(false);
assertEquals(numProcessors, plugin.getNumProcessors());
assertEquals(2, plugin.getNumCores());
// dual socket, dual core, no hyper threading
fileContent = "";
numProcessors = 4;
for (int i = 0; i < numProcessors; i++) {
fileContent =
fileContent.concat(String.format(CPUINFO_FORMAT, i,
cpuFrequencyKHz / 1000D, i / 2, i % 2));
fileContent = fileContent.concat("\n");
}
writeFakeCPUInfoFile(fileContent);
plugin.setReadCpuInfoFile(false);
assertEquals(numProcessors, plugin.getNumProcessors());
assertEquals(4, plugin.getNumCores());
// dual socket, dual core, hyper threading
fileContent = "";
numProcessors = 8;
for (int i = 0; i < numProcessors; i++) {
fileContent =
fileContent.concat(String.format(CPUINFO_FORMAT, i,
cpuFrequencyKHz / 1000D, i / 4, (i % 4) / 2));
fileContent = fileContent.concat("\n");
}
writeFakeCPUInfoFile(fileContent);
plugin.setReadCpuInfoFile(false);
assertEquals(numProcessors, plugin.getNumProcessors());
assertEquals(4, plugin.getNumCores());
}
private void writeFakeCPUInfoFile(String content) throws IOException {
File tempFile = new File(FAKE_CPUFILE);
FileWriter fWriter = new FileWriter(FAKE_CPUFILE);
tempFile.deleteOnExit();
try {
fWriter.write(content);
} finally {
IOUtils.closeQuietly(fWriter);
}
}
} }

View File

@ -45,6 +45,7 @@ import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container; import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerDiagnosticsUpdateEvent; import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerDiagnosticsUpdateEvent;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.launcher.ContainerLaunch; import org.apache.hadoop.yarn.server.nodemanager.containermanager.launcher.ContainerLaunch;
import org.apache.hadoop.yarn.server.nodemanager.util.NodeManagerHardwareUtils;
import org.apache.hadoop.yarn.server.nodemanager.executor.ContainerLivenessContext; import org.apache.hadoop.yarn.server.nodemanager.executor.ContainerLivenessContext;
import org.apache.hadoop.yarn.server.nodemanager.executor.ContainerReacquisitionContext; import org.apache.hadoop.yarn.server.nodemanager.executor.ContainerReacquisitionContext;
import org.apache.hadoop.yarn.server.nodemanager.executor.ContainerSignalContext; import org.apache.hadoop.yarn.server.nodemanager.executor.ContainerSignalContext;
@ -372,28 +373,16 @@ public abstract class ContainerExecutor implements Configurable {
YarnConfiguration.NM_WINDOWS_CONTAINER_CPU_LIMIT_ENABLED, YarnConfiguration.NM_WINDOWS_CONTAINER_CPU_LIMIT_ENABLED,
YarnConfiguration.DEFAULT_NM_WINDOWS_CONTAINER_CPU_LIMIT_ENABLED)) { YarnConfiguration.DEFAULT_NM_WINDOWS_CONTAINER_CPU_LIMIT_ENABLED)) {
int containerVCores = resource.getVirtualCores(); int containerVCores = resource.getVirtualCores();
int nodeVCores = conf.getInt(YarnConfiguration.NM_VCORES, int nodeVCores = NodeManagerHardwareUtils.getVCores(conf);
YarnConfiguration.DEFAULT_NM_VCORES); int nodeCpuPercentage =
// cap overall usage to the number of cores allocated to YARN NodeManagerHardwareUtils.getNodeCpuPercentage(conf);
int nodeCpuPercentage = Math
.min( float containerCpuPercentage =
conf.getInt( (float) (nodeCpuPercentage * containerVCores) / nodeVCores;
YarnConfiguration.NM_RESOURCE_PERCENTAGE_PHYSICAL_CPU_LIMIT,
YarnConfiguration.DEFAULT_NM_RESOURCE_PERCENTAGE_PHYSICAL_CPU_LIMIT),
100);
nodeCpuPercentage = Math.max(0, nodeCpuPercentage);
if (nodeCpuPercentage == 0) {
String message = "Illegal value for "
+ YarnConfiguration.NM_RESOURCE_PERCENTAGE_PHYSICAL_CPU_LIMIT
+ ". Value cannot be less than or equal to 0.";
throw new IllegalArgumentException(message);
}
float yarnVCores = (nodeCpuPercentage * nodeVCores) / 100.0f;
// CPU should be set to a percentage * 100, e.g. 20% cpu rate limit // CPU should be set to a percentage * 100, e.g. 20% cpu rate limit
// should be set as 20 * 100. The following setting is equal to: // should be set as 20 * 100.
// 100 * (100 * (vcores / Total # of cores allocated to YARN)) cpuRate = Math.min(10000, (int) (containerCpuPercentage * 100));
cpuRate = Math.min(10000,
(int) ((containerVCores * 10000) / yarnVCores));
} }
} }
return new String[] { Shell.WINUTILS, "task", "create", "-m", return new String[] { Shell.WINUTILS, "task", "create", "-m",

View File

@ -76,6 +76,8 @@ import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.Ap
import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container; import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container;
import org.apache.hadoop.yarn.server.nodemanager.metrics.NodeManagerMetrics; import org.apache.hadoop.yarn.server.nodemanager.metrics.NodeManagerMetrics;
import org.apache.hadoop.yarn.server.nodemanager.nodelabels.NodeLabelsProvider; import org.apache.hadoop.yarn.server.nodemanager.nodelabels.NodeLabelsProvider;
import org.apache.hadoop.yarn.util.Records;
import org.apache.hadoop.yarn.server.nodemanager.util.NodeManagerHardwareUtils;
import org.apache.hadoop.yarn.util.YarnVersionInfo; import org.apache.hadoop.yarn.util.YarnVersionInfo;
import com.google.common.annotations.VisibleForTesting; import com.google.common.annotations.VisibleForTesting;
@ -157,18 +159,16 @@ public class NodeStatusUpdaterImpl extends AbstractService implements
@Override @Override
protected void serviceInit(Configuration conf) throws Exception { protected void serviceInit(Configuration conf) throws Exception {
int memoryMb = int memoryMb = NodeManagerHardwareUtils.getContainerMemoryMB(conf);
conf.getInt(
YarnConfiguration.NM_PMEM_MB, YarnConfiguration.DEFAULT_NM_PMEM_MB);
float vMemToPMem = float vMemToPMem =
conf.getFloat( conf.getFloat(
YarnConfiguration.NM_VMEM_PMEM_RATIO, YarnConfiguration.NM_VMEM_PMEM_RATIO,
YarnConfiguration.DEFAULT_NM_VMEM_PMEM_RATIO); YarnConfiguration.DEFAULT_NM_VMEM_PMEM_RATIO);
int virtualMemoryMb = (int)Math.ceil(memoryMb * vMemToPMem); int virtualMemoryMb = (int)Math.ceil(memoryMb * vMemToPMem);
int virtualCores = int virtualCores = NodeManagerHardwareUtils.getVCores(conf);
conf.getInt( LOG.info("Nodemanager resources: memory set to " + memoryMb + "MB.");
YarnConfiguration.NM_VCORES, YarnConfiguration.DEFAULT_NM_VCORES); LOG.info("Nodemanager resources: vcores set to " + virtualCores + ".");
this.totalResource = Resource.newInstance(memoryMb, virtualCores); this.totalResource = Resource.newInstance(memoryMb, virtualCores);
metrics.addResource(totalResource); metrics.addResource(totalResource);

View File

@ -117,14 +117,11 @@ public class ContainersMonitorImpl extends AbstractService implements
conf.getLong(YarnConfiguration.NM_CONTAINER_METRICS_PERIOD_MS, conf.getLong(YarnConfiguration.NM_CONTAINER_METRICS_PERIOD_MS,
YarnConfiguration.DEFAULT_NM_CONTAINER_METRICS_PERIOD_MS); YarnConfiguration.DEFAULT_NM_CONTAINER_METRICS_PERIOD_MS);
long configuredPMemForContainers = conf.getLong( long configuredPMemForContainers =
YarnConfiguration.NM_PMEM_MB, NodeManagerHardwareUtils.getContainerMemoryMB(conf) * 1024 * 1024L;
YarnConfiguration.DEFAULT_NM_PMEM_MB) * 1024 * 1024l;
long configuredVCoresForContainers = conf.getLong(
YarnConfiguration.NM_VCORES,
YarnConfiguration.DEFAULT_NM_VCORES);
long configuredVCoresForContainers =
NodeManagerHardwareUtils.getVCores(conf);
// Setting these irrespective of whether checks are enabled. Required in // Setting these irrespective of whether checks are enabled. Required in
// the UI. // the UI.

View File

@ -22,7 +22,6 @@ import java.io.BufferedReader;
import java.io.File; import java.io.File;
import java.io.FileInputStream; import java.io.FileInputStream;
import java.io.FileOutputStream; import java.io.FileOutputStream;
import java.io.FileReader;
import java.io.IOException; import java.io.IOException;
import java.io.InputStreamReader; import java.io.InputStreamReader;
import java.io.OutputStreamWriter; import java.io.OutputStreamWriter;
@ -83,6 +82,7 @@ public class CgroupsLCEResourcesHandler implements LCEResourcesHandler {
Clock clock; Clock clock;
private float yarnProcessors; private float yarnProcessors;
int nodeVCores;
public CgroupsLCEResourcesHandler() { public CgroupsLCEResourcesHandler() {
this.controllerPaths = new HashMap<String, String>(); this.controllerPaths = new HashMap<String, String>();
@ -152,9 +152,11 @@ public class CgroupsLCEResourcesHandler implements LCEResourcesHandler {
initializeControllerPaths(); initializeControllerPaths();
nodeVCores = NodeManagerHardwareUtils.getVCores(plugin, conf);
// cap overall usage to the number of cores allocated to YARN // cap overall usage to the number of cores allocated to YARN
yarnProcessors = NodeManagerHardwareUtils.getContainersCores(plugin, conf); yarnProcessors = NodeManagerHardwareUtils.getContainersCPUs(plugin, conf);
int systemProcessors = plugin.getNumProcessors(); int systemProcessors = NodeManagerHardwareUtils.getNodeCPUs(plugin, conf);
if (systemProcessors != (int) yarnProcessors) { if (systemProcessors != (int) yarnProcessors) {
LOG.info("YARN containers restricted to " + yarnProcessors + " cores"); LOG.info("YARN containers restricted to " + yarnProcessors + " cores");
int[] limits = getOverallLimits(yarnProcessors); int[] limits = getOverallLimits(yarnProcessors);
@ -368,9 +370,6 @@ public class CgroupsLCEResourcesHandler implements LCEResourcesHandler {
updateCgroup(CONTROLLER_CPU, containerName, "shares", updateCgroup(CONTROLLER_CPU, containerName, "shares",
String.valueOf(cpuShares)); String.valueOf(cpuShares));
if (strictResourceUsageMode) { if (strictResourceUsageMode) {
int nodeVCores =
conf.getInt(YarnConfiguration.NM_VCORES,
YarnConfiguration.DEFAULT_NM_VCORES);
if (nodeVCores != containerVCores) { if (nodeVCores != containerVCores) {
float containerCPU = float containerCPU =
(containerVCores * yarnProcessors) / (float) nodeVCores; (containerVCores * yarnProcessors) / (float) nodeVCores;

View File

@ -18,35 +18,84 @@
package org.apache.hadoop.yarn.server.nodemanager.util; package org.apache.hadoop.yarn.server.nodemanager.util;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability; import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.util.ResourceCalculatorPlugin; import org.apache.hadoop.yarn.util.ResourceCalculatorPlugin;
/**
* Helper class to determine hardware related characteristics such as the
* number of processors and the amount of memory on the node.
*/
@InterfaceAudience.Private @InterfaceAudience.Private
@InterfaceStability.Unstable @InterfaceStability.Unstable
public class NodeManagerHardwareUtils { public class NodeManagerHardwareUtils {
private static final Log LOG = LogFactory
.getLog(NodeManagerHardwareUtils.class);
/** /**
* *
* Returns the fraction of CPU cores that should be used for YARN containers. * Returns the number of CPUs on the node. This value depends on the
* configuration setting which decides whether to count logical processors
* (such as hyperthreads) as cores or not.
*
* @param conf
* - Configuration object
* @return Number of CPUs
*/
public static int getNodeCPUs(Configuration conf) {
ResourceCalculatorPlugin plugin =
ResourceCalculatorPlugin.getResourceCalculatorPlugin(null, conf);
return NodeManagerHardwareUtils.getNodeCPUs(plugin, conf);
}
/**
*
* Returns the number of CPUs on the node. This value depends on the
* configuration setting which decides whether to count logical processors
* (such as hyperthreads) as cores or not.
*
* @param plugin
* - ResourceCalculatorPlugin object to determine hardware specs
* @param conf
* - Configuration object
* @return Number of CPU cores on the node.
*/
public static int getNodeCPUs(ResourceCalculatorPlugin plugin,
Configuration conf) {
int numProcessors = plugin.getNumProcessors();
boolean countLogicalCores =
conf.getBoolean(YarnConfiguration.NM_COUNT_LOGICAL_PROCESSORS_AS_CORES,
YarnConfiguration.DEFAULT_NM_COUNT_LOGICAL_PROCESSORS_AS_CORES);
if (!countLogicalCores) {
numProcessors = plugin.getNumCores();
}
return numProcessors;
}
/**
*
* Returns the fraction of CPUs that should be used for YARN containers.
* The number is derived based on various configuration params such as * The number is derived based on various configuration params such as
* YarnConfiguration.NM_RESOURCE_PERCENTAGE_PHYSICAL_CPU_LIMIT * YarnConfiguration.NM_RESOURCE_PERCENTAGE_PHYSICAL_CPU_LIMIT
* *
* @param conf * @param conf
* - Configuration object * - Configuration object
* @return Fraction of CPU cores to be used for YARN containers * @return Fraction of CPUs to be used for YARN containers
*/ */
public static float getContainersCores(Configuration conf) { public static float getContainersCPUs(Configuration conf) {
ResourceCalculatorPlugin plugin = ResourceCalculatorPlugin plugin =
ResourceCalculatorPlugin.getResourceCalculatorPlugin(null, conf); ResourceCalculatorPlugin.getResourceCalculatorPlugin(null, conf);
return NodeManagerHardwareUtils.getContainersCores(plugin, conf); return NodeManagerHardwareUtils.getContainersCPUs(plugin, conf);
} }
/** /**
* *
* Returns the fraction of CPU cores that should be used for YARN containers. * Returns the fraction of CPUs that should be used for YARN containers.
* The number is derived based on various configuration params such as * The number is derived based on various configuration params such as
* YarnConfiguration.NM_RESOURCE_PERCENTAGE_PHYSICAL_CPU_LIMIT * YarnConfiguration.NM_RESOURCE_PERCENTAGE_PHYSICAL_CPU_LIMIT
* *
@ -54,11 +103,11 @@ public class NodeManagerHardwareUtils {
* - ResourceCalculatorPlugin object to determine hardware specs * - ResourceCalculatorPlugin object to determine hardware specs
* @param conf * @param conf
* - Configuration object * - Configuration object
* @return Fraction of CPU cores to be used for YARN containers * @return Fraction of CPUs to be used for YARN containers
*/ */
public static float getContainersCores(ResourceCalculatorPlugin plugin, public static float getContainersCPUs(ResourceCalculatorPlugin plugin,
Configuration conf) { Configuration conf) {
int numProcessors = plugin.getNumProcessors(); int numProcessors = getNodeCPUs(plugin, conf);
int nodeCpuPercentage = getNodeCpuPercentage(conf); int nodeCpuPercentage = getNodeCpuPercentage(conf);
return (nodeCpuPercentage * numProcessors) / 100.0f; return (nodeCpuPercentage * numProcessors) / 100.0f;
@ -88,4 +137,177 @@ public class NodeManagerHardwareUtils {
} }
return nodeCpuPercentage; return nodeCpuPercentage;
} }
/**
* Function to return the number of vcores on the system that can be used for
* YARN containers. If a number is specified in the configuration file, then
* that number is returned. If nothing is specified - 1. If the OS is an
* "unknown" OS(one for which we don't have ResourceCalculatorPlugin
* implemented), return the default NodeManager cores. 2. If the config
* variable yarn.nodemanager.cpu.use_logical_processors is set to true, it
* returns the logical processor count(count hyperthreads as cores), else it
* returns the physical cores count.
*
* @param conf
* - the configuration for the NodeManager
* @return the number of cores to be used for YARN containers
*
*/
public static int getVCores(Configuration conf) {
// is this os for which we can determine cores?
ResourceCalculatorPlugin plugin =
ResourceCalculatorPlugin.getResourceCalculatorPlugin(null, conf);
return NodeManagerHardwareUtils.getVCores(plugin, conf);
}
/**
* Function to return the number of vcores on the system that can be used for
* YARN containers. If a number is specified in the configuration file, then
* that number is returned. If nothing is specified - 1. If the OS is an
* "unknown" OS(one for which we don't have ResourceCalculatorPlugin
* implemented), return the default NodeManager cores. 2. If the config
* variable yarn.nodemanager.cpu.use_logical_processors is set to true, it
* returns the logical processor count(count hyperthreads as cores), else it
* returns the physical cores count.
*
* @param plugin
* - ResourceCalculatorPlugin object to determine hardware specs
* @param conf
* - the configuration for the NodeManager
* @return the number of cores to be used for YARN containers
*
*/
public static int getVCores(ResourceCalculatorPlugin plugin,
Configuration conf) {
int cores;
boolean hardwareDetectionEnabled =
conf.getBoolean(
YarnConfiguration.NM_ENABLE_HARDWARE_CAPABILITY_DETECTION,
YarnConfiguration.DEFAULT_NM_ENABLE_HARDWARE_CAPABILITY_DETECTION);
String message;
if (!hardwareDetectionEnabled || plugin == null) {
cores =
conf.getInt(YarnConfiguration.NM_VCORES,
YarnConfiguration.DEFAULT_NM_VCORES);
if (cores == -1) {
cores = YarnConfiguration.DEFAULT_NM_VCORES;
}
} else {
cores = conf.getInt(YarnConfiguration.NM_VCORES, -1);
if (cores == -1) {
float physicalCores =
NodeManagerHardwareUtils.getContainersCPUs(plugin, conf);
float multiplier =
conf.getFloat(YarnConfiguration.NM_PCORES_VCORES_MULTIPLIER,
YarnConfiguration.DEFAULT_NM_PCORES_VCORES_MULTIPLIER);
if (multiplier > 0) {
float tmp = physicalCores * multiplier;
if (tmp > 0 && tmp < 1) {
// on a single core machine - tmp can be between 0 and 1
cores = 1;
} else {
cores = (int) tmp;
}
} else {
message = "Illegal value for "
+ YarnConfiguration.NM_PCORES_VCORES_MULTIPLIER
+ ". Value must be greater than 0.";
throw new IllegalArgumentException(message);
}
}
}
if(cores <= 0) {
message = "Illegal value for " + YarnConfiguration.NM_VCORES
+ ". Value must be greater than 0.";
throw new IllegalArgumentException(message);
}
return cores;
}
/**
* Function to return how much memory we should set aside for YARN containers.
* If a number is specified in the configuration file, then that number is
* returned. If nothing is specified - 1. If the OS is an "unknown" OS(one for
* which we don't have ResourceCalculatorPlugin implemented), return the
* default NodeManager physical memory. 2. If the OS has a
* ResourceCalculatorPlugin implemented, the calculation is 0.8 * (RAM - 2 *
* JVM-memory) i.e. use 80% of the memory after accounting for memory used by
* the DataNode and the NodeManager. If the number is less than 1GB, log a
* warning message.
*
* @param conf
* - the configuration for the NodeManager
* @return the amount of memory that will be used for YARN containers in MB.
*/
public static int getContainerMemoryMB(Configuration conf) {
return NodeManagerHardwareUtils.getContainerMemoryMB(
ResourceCalculatorPlugin.getResourceCalculatorPlugin(null, conf), conf);
}
/**
* Function to return how much memory we should set aside for YARN containers.
* If a number is specified in the configuration file, then that number is
* returned. If nothing is specified - 1. If the OS is an "unknown" OS(one for
* which we don't have ResourceCalculatorPlugin implemented), return the
* default NodeManager physical memory. 2. If the OS has a
* ResourceCalculatorPlugin implemented, the calculation is 0.8 * (RAM - 2 *
* JVM-memory) i.e. use 80% of the memory after accounting for memory used by
* the DataNode and the NodeManager. If the number is less than 1GB, log a
* warning message.
*
* @param plugin
* - ResourceCalculatorPlugin object to determine hardware specs
* @param conf
* - the configuration for the NodeManager
* @return the amount of memory that will be used for YARN containers in MB.
*/
public static int getContainerMemoryMB(ResourceCalculatorPlugin plugin,
Configuration conf) {
int memoryMb;
boolean hardwareDetectionEnabled = conf.getBoolean(
YarnConfiguration.NM_ENABLE_HARDWARE_CAPABILITY_DETECTION,
YarnConfiguration.DEFAULT_NM_ENABLE_HARDWARE_CAPABILITY_DETECTION);
if (!hardwareDetectionEnabled || plugin == null) {
memoryMb = conf.getInt(YarnConfiguration.NM_PMEM_MB,
YarnConfiguration.DEFAULT_NM_PMEM_MB);
if (memoryMb == -1) {
memoryMb = YarnConfiguration.DEFAULT_NM_PMEM_MB;
}
} else {
memoryMb = conf.getInt(YarnConfiguration.NM_PMEM_MB, -1);
if (memoryMb == -1) {
int physicalMemoryMB =
(int) (plugin.getPhysicalMemorySize() / (1024 * 1024));
int hadoopHeapSizeMB =
(int) (Runtime.getRuntime().maxMemory() / (1024 * 1024));
int containerPhysicalMemoryMB =
(int) (0.8f * (physicalMemoryMB - (2 * hadoopHeapSizeMB)));
int reservedMemoryMB =
conf.getInt(YarnConfiguration.NM_SYSTEM_RESERVED_PMEM_MB, -1);
if (reservedMemoryMB != -1) {
containerPhysicalMemoryMB = physicalMemoryMB - reservedMemoryMB;
}
if(containerPhysicalMemoryMB <= 0) {
LOG.error("Calculated memory for YARN containers is too low."
+ " Node memory is " + physicalMemoryMB
+ " MB, system reserved memory is "
+ reservedMemoryMB + " MB.");
}
containerPhysicalMemoryMB = Math.max(containerPhysicalMemoryMB, 0);
memoryMb = containerPhysicalMemoryMB;
}
}
if(memoryMb <= 0) {
String message = "Illegal value for " + YarnConfiguration.NM_PMEM_MB
+ ". Value must be greater than 0.";
throw new IllegalArgumentException(message);
}
return memoryMb;
}
} }

View File

@ -108,18 +108,56 @@ public class TestContainerExecutor {
public void testRunCommandWithCpuAndMemoryResources() { public void testRunCommandWithCpuAndMemoryResources() {
// Windows only test // Windows only test
assumeTrue(Shell.WINDOWS); assumeTrue(Shell.WINDOWS);
int containerCores = 1;
Configuration conf = new Configuration(); Configuration conf = new Configuration();
conf.set(YarnConfiguration.NM_WINDOWS_CONTAINER_CPU_LIMIT_ENABLED, "true"); conf.set(YarnConfiguration.NM_WINDOWS_CONTAINER_CPU_LIMIT_ENABLED, "true");
conf.set(YarnConfiguration.NM_WINDOWS_CONTAINER_MEMORY_LIMIT_ENABLED, "true"); conf.set(YarnConfiguration.NM_WINDOWS_CONTAINER_MEMORY_LIMIT_ENABLED, "true");
String[] command = containerExecutor.getRunCommand("echo", "group1", null, null,
conf, Resource.newInstance(1024, 1)); String[] command =
float yarnProcessors = NodeManagerHardwareUtils.getContainersCores( containerExecutor.getRunCommand("echo", "group1", null, null, conf,
ResourceCalculatorPlugin.getResourceCalculatorPlugin(null, conf), Resource.newInstance(1024, 1));
conf); int nodeVCores = NodeManagerHardwareUtils.getVCores(conf);
int cpuRate = Math.min(10000, (int) ((1 * 10000) / yarnProcessors)); Assert.assertEquals(YarnConfiguration.DEFAULT_NM_VCORES, nodeVCores);
int cpuRate = Math.min(10000, (containerCores * 10000) / nodeVCores);
// Assert the cpu and memory limits are set correctly in the command // Assert the cpu and memory limits are set correctly in the command
String[] expected = { Shell.WINUTILS, "task", "create", "-m", "1024", "-c", String[] expected =
{Shell.WINUTILS, "task", "create", "-m", "1024", "-c",
String.valueOf(cpuRate), "group1", "cmd /c " + "echo" }; String.valueOf(cpuRate), "group1", "cmd /c " + "echo" };
Assert.assertTrue(Arrays.equals(expected, command)); Assert.assertEquals(Arrays.toString(expected), Arrays.toString(command));
conf.setBoolean(YarnConfiguration.NM_ENABLE_HARDWARE_CAPABILITY_DETECTION,
true);
int nodeCPUs = NodeManagerHardwareUtils.getNodeCPUs(conf);
float yarnCPUs = NodeManagerHardwareUtils.getContainersCPUs(conf);
nodeVCores = NodeManagerHardwareUtils.getVCores(conf);
Assert.assertEquals(nodeCPUs, (int) yarnCPUs);
Assert.assertEquals(nodeCPUs, nodeVCores);
command =
containerExecutor.getRunCommand("echo", "group1", null, null, conf,
Resource.newInstance(1024, 1));
cpuRate = Math.min(10000, (containerCores * 10000) / nodeVCores);
expected[6] = String.valueOf(cpuRate);
Assert.assertEquals(Arrays.toString(expected), Arrays.toString(command));
int yarnCpuLimit = 80;
conf.setInt(YarnConfiguration.NM_RESOURCE_PERCENTAGE_PHYSICAL_CPU_LIMIT,
yarnCpuLimit);
yarnCPUs = NodeManagerHardwareUtils.getContainersCPUs(conf);
nodeVCores = NodeManagerHardwareUtils.getVCores(conf);
Assert.assertEquals(nodeCPUs * 0.8, yarnCPUs, 0.01);
if (nodeCPUs == 1) {
Assert.assertEquals(1, nodeVCores);
} else {
Assert.assertEquals((int) (nodeCPUs * 0.8), nodeVCores);
}
command =
containerExecutor.getRunCommand("echo", "group1", null, null, conf,
Resource.newInstance(1024, 1));
// we should get 100 * (1/nodeVcores) of 80% of CPU
int containerPerc = (yarnCpuLimit * containerCores) / nodeVCores;
cpuRate = Math.min(10000, 100 * containerPerc);
expected[6] = String.valueOf(cpuRate);
Assert.assertEquals(Arrays.toString(expected), Arrays.toString(command));
} }
} }

View File

@ -160,6 +160,7 @@ public class TestCgroupsLCEResourcesHandler {
ResourceCalculatorPlugin plugin = ResourceCalculatorPlugin plugin =
Mockito.mock(ResourceCalculatorPlugin.class); Mockito.mock(ResourceCalculatorPlugin.class);
Mockito.doReturn(numProcessors).when(plugin).getNumProcessors(); Mockito.doReturn(numProcessors).when(plugin).getNumProcessors();
Mockito.doReturn(numProcessors).when(plugin).getNumCores();
handler.setConf(conf); handler.setConf(conf);
handler.initConfig(); handler.initConfig();
@ -256,6 +257,7 @@ public class TestCgroupsLCEResourcesHandler {
ResourceCalculatorPlugin plugin = ResourceCalculatorPlugin plugin =
Mockito.mock(ResourceCalculatorPlugin.class); Mockito.mock(ResourceCalculatorPlugin.class);
Mockito.doReturn(numProcessors).when(plugin).getNumProcessors(); Mockito.doReturn(numProcessors).when(plugin).getNumProcessors();
Mockito.doReturn(numProcessors).when(plugin).getNumCores();
handler.setConf(conf); handler.setConf(conf);
handler.initConfig(); handler.initConfig();

View File

@ -24,49 +24,170 @@ import org.junit.Assert;
import org.junit.Test; import org.junit.Test;
import org.mockito.Mockito; import org.mockito.Mockito;
/**
* Test the various functions provided by the NodeManagerHardwareUtils class.
*/
public class TestNodeManagerHardwareUtils { public class TestNodeManagerHardwareUtils {
static class TestResourceCalculatorPlugin extends ResourceCalculatorPlugin {
@Override
public long getVirtualMemorySize() {
return 0;
}
@Override
public long getPhysicalMemorySize() {
long ret = Runtime.getRuntime().maxMemory() * 2;
ret = ret + (4L * 1024 * 1024 * 1024);
return ret;
}
@Override
public long getAvailableVirtualMemorySize() {
return 0;
}
@Override
public long getAvailablePhysicalMemorySize() {
return 0;
}
@Override
public int getNumProcessors() {
return 8;
}
@Override
public long getCpuFrequency() {
return 0;
}
@Override
public long getCumulativeCpuTime() {
return 0;
}
@Override
public float getCpuUsage() {
return 0;
}
@Override
public int getNumCores() {
return 4;
}
}
@Test @Test
public void testGetContainerCores() { public void testGetContainerCPU() {
YarnConfiguration conf = new YarnConfiguration(); YarnConfiguration conf = new YarnConfiguration();
float ret; float ret;
final int numProcessors = 4; final int numProcessors = 8;
final int numCores = 4;
ResourceCalculatorPlugin plugin = ResourceCalculatorPlugin plugin =
Mockito.mock(ResourceCalculatorPlugin.class); Mockito.mock(ResourceCalculatorPlugin.class);
Mockito.doReturn(numProcessors).when(plugin).getNumProcessors(); Mockito.doReturn(numProcessors).when(plugin).getNumProcessors();
Mockito.doReturn(numCores).when(plugin).getNumCores();
conf.setInt(YarnConfiguration.NM_RESOURCE_PERCENTAGE_PHYSICAL_CPU_LIMIT, 0); conf.setInt(YarnConfiguration.NM_RESOURCE_PERCENTAGE_PHYSICAL_CPU_LIMIT, 0);
boolean catchFlag = false;
try { try {
NodeManagerHardwareUtils.getContainersCores(plugin, conf); NodeManagerHardwareUtils.getContainersCPUs(plugin, conf);
Assert.fail("getContainerCores should have thrown exception"); Assert.fail("getContainerCores should have thrown exception");
} catch (IllegalArgumentException ie) { } catch (IllegalArgumentException ie) {
// expected catchFlag = true;
} }
Assert.assertTrue(catchFlag);
conf.setInt(YarnConfiguration.NM_RESOURCE_PERCENTAGE_PHYSICAL_CPU_LIMIT, conf.setInt(YarnConfiguration.NM_RESOURCE_PERCENTAGE_PHYSICAL_CPU_LIMIT,
100); 100);
ret = NodeManagerHardwareUtils.getContainersCores(plugin, conf); ret = NodeManagerHardwareUtils.getContainersCPUs(plugin, conf);
Assert.assertEquals(4, (int) ret); Assert.assertEquals(4, (int) ret);
conf conf
.setInt(YarnConfiguration.NM_RESOURCE_PERCENTAGE_PHYSICAL_CPU_LIMIT, 50); .setInt(YarnConfiguration.NM_RESOURCE_PERCENTAGE_PHYSICAL_CPU_LIMIT, 50);
ret = NodeManagerHardwareUtils.getContainersCores(plugin, conf); ret = NodeManagerHardwareUtils.getContainersCPUs(plugin, conf);
Assert.assertEquals(2, (int) ret); Assert.assertEquals(2, (int) ret);
conf conf
.setInt(YarnConfiguration.NM_RESOURCE_PERCENTAGE_PHYSICAL_CPU_LIMIT, 75); .setInt(YarnConfiguration.NM_RESOURCE_PERCENTAGE_PHYSICAL_CPU_LIMIT, 75);
ret = NodeManagerHardwareUtils.getContainersCores(plugin, conf); ret = NodeManagerHardwareUtils.getContainersCPUs(plugin, conf);
Assert.assertEquals(3, (int) ret); Assert.assertEquals(3, (int) ret);
conf conf
.setInt(YarnConfiguration.NM_RESOURCE_PERCENTAGE_PHYSICAL_CPU_LIMIT, 85); .setInt(YarnConfiguration.NM_RESOURCE_PERCENTAGE_PHYSICAL_CPU_LIMIT, 85);
ret = NodeManagerHardwareUtils.getContainersCores(plugin, conf); ret = NodeManagerHardwareUtils.getContainersCPUs(plugin, conf);
Assert.assertEquals(3.4, ret, 0.1); Assert.assertEquals(3.4, ret, 0.1);
conf.setInt(YarnConfiguration.NM_RESOURCE_PERCENTAGE_PHYSICAL_CPU_LIMIT, conf.setInt(YarnConfiguration.NM_RESOURCE_PERCENTAGE_PHYSICAL_CPU_LIMIT,
110); 110);
ret = NodeManagerHardwareUtils.getContainersCores(plugin, conf); ret = NodeManagerHardwareUtils.getContainersCPUs(plugin, conf);
Assert.assertEquals(4, (int) ret); Assert.assertEquals(4, (int) ret);
} }
@Test
public void testGetVCores() {
ResourceCalculatorPlugin plugin = new TestResourceCalculatorPlugin();
YarnConfiguration conf = new YarnConfiguration();
conf.setFloat(YarnConfiguration.NM_PCORES_VCORES_MULTIPLIER, 1.25f);
int ret = NodeManagerHardwareUtils.getVCores(plugin, conf);
Assert.assertEquals(YarnConfiguration.DEFAULT_NM_VCORES, ret);
conf.setBoolean(YarnConfiguration.NM_ENABLE_HARDWARE_CAPABILITY_DETECTION,
true);
ret = NodeManagerHardwareUtils.getVCores(plugin, conf);
Assert.assertEquals(5, ret);
conf.setBoolean(YarnConfiguration.NM_COUNT_LOGICAL_PROCESSORS_AS_CORES,
true);
ret = NodeManagerHardwareUtils.getVCores(plugin, conf);
Assert.assertEquals(10, ret);
conf.setInt(YarnConfiguration.NM_VCORES, 10);
ret = NodeManagerHardwareUtils.getVCores(plugin, conf);
Assert.assertEquals(10, ret);
YarnConfiguration conf1 = new YarnConfiguration();
conf1.setBoolean(YarnConfiguration.NM_ENABLE_HARDWARE_CAPABILITY_DETECTION,
false);
conf.setInt(YarnConfiguration.NM_VCORES, 10);
ret = NodeManagerHardwareUtils.getVCores(plugin, conf);
Assert.assertEquals(10, ret);
}
@Test
public void testGetContainerMemoryMB() throws Exception {
ResourceCalculatorPlugin plugin = new TestResourceCalculatorPlugin();
long physicalMemMB = plugin.getPhysicalMemorySize() / (1024 * 1024);
YarnConfiguration conf = new YarnConfiguration();
conf.setBoolean(YarnConfiguration.NM_ENABLE_HARDWARE_CAPABILITY_DETECTION,
true);
int mem = NodeManagerHardwareUtils.getContainerMemoryMB(null, conf);
Assert.assertEquals(YarnConfiguration.DEFAULT_NM_PMEM_MB, mem);
mem = NodeManagerHardwareUtils.getContainerMemoryMB(plugin, conf);
int hadoopHeapSizeMB =
(int) (Runtime.getRuntime().maxMemory() / (1024 * 1024));
int calculatedMemMB =
(int) (0.8 * (physicalMemMB - (2 * hadoopHeapSizeMB)));
Assert.assertEquals(calculatedMemMB, mem);
conf.setInt(YarnConfiguration.NM_PMEM_MB, 1024);
mem = NodeManagerHardwareUtils.getContainerMemoryMB(conf);
Assert.assertEquals(1024, mem);
conf = new YarnConfiguration();
conf.setBoolean(YarnConfiguration.NM_ENABLE_HARDWARE_CAPABILITY_DETECTION,
false);
mem = NodeManagerHardwareUtils.getContainerMemoryMB(conf);
Assert.assertEquals(YarnConfiguration.DEFAULT_NM_PMEM_MB, mem);
conf.setInt(YarnConfiguration.NM_PMEM_MB, 10 * 1024);
mem = NodeManagerHardwareUtils.getContainerMemoryMB(conf);
Assert.assertEquals(10 * 1024, mem);
}
} }