YARN-3122. Metrics for container's actual CPU usage. (Anubhav Dhoot via kasha)

This commit is contained in:
Karthik Kambatla 2015-03-04 17:33:30 -08:00
parent 722b479469
commit 53947f37c7
12 changed files with 311 additions and 53 deletions

View File

@ -351,6 +351,9 @@ Release 2.7.0 - UNRELEASED
YARN-3272. Surface container locality info in RM web UI.
(Jian He via wangda)
YARN-3122. Metrics for container's actual CPU usage.
(Anubhav Dhoot via kasha)
OPTIMIZATIONS
YARN-2990. FairScheduler's delay-scheduling always waits for node-local and

View File

@ -0,0 +1,99 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.util;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import java.math.BigInteger;
@InterfaceAudience.Private
@InterfaceStability.Unstable
public class CpuTimeTracker {
public static final int UNAVAILABLE = -1;
final long MINIMUM_UPDATE_INTERVAL;
// CPU used time since system is on (ms)
BigInteger cumulativeCpuTime = BigInteger.ZERO;
// CPU used time read last time (ms)
BigInteger lastCumulativeCpuTime = BigInteger.ZERO;
// Unix timestamp while reading the CPU time (ms)
long sampleTime;
long lastSampleTime;
float cpuUsage;
BigInteger jiffyLengthInMillis;
public CpuTimeTracker(long jiffyLengthInMillis) {
this.jiffyLengthInMillis = BigInteger.valueOf(jiffyLengthInMillis);
this.cpuUsage = UNAVAILABLE;
this.sampleTime = UNAVAILABLE;
this.lastSampleTime = UNAVAILABLE;
MINIMUM_UPDATE_INTERVAL = 10 * jiffyLengthInMillis;
}
/**
* Return percentage of cpu time spent over the time since last update.
* CPU time spent is based on elapsed jiffies multiplied by amount of
* time for 1 core. Thus, if you use 2 cores completely you would have spent
* twice the actual time between updates and this will return 200%.
*
* @return Return percentage of cpu usage since last update, {@link
* CpuTimeTracker#UNAVAILABLE} if there haven't been 2 updates more than
* {@link CpuTimeTracker#MINIMUM_UPDATE_INTERVAL} apart
*/
public float getCpuTrackerUsagePercent() {
if (lastSampleTime == UNAVAILABLE ||
lastSampleTime > sampleTime) {
// lastSampleTime > sampleTime may happen when the system time is changed
lastSampleTime = sampleTime;
lastCumulativeCpuTime = cumulativeCpuTime;
return cpuUsage;
}
// When lastSampleTime is sufficiently old, update cpuUsage.
// Also take a sample of the current time and cumulative CPU time for the
// use of the next calculation.
if (sampleTime > lastSampleTime + MINIMUM_UPDATE_INTERVAL) {
cpuUsage =
((cumulativeCpuTime.subtract(lastCumulativeCpuTime)).floatValue())
* 100F / ((float) (sampleTime - lastSampleTime));
lastSampleTime = sampleTime;
lastCumulativeCpuTime = cumulativeCpuTime;
}
return cpuUsage;
}
public void updateElapsedJiffies(BigInteger elapedJiffies, long sampleTime) {
this.cumulativeCpuTime = elapedJiffies.multiply(jiffyLengthInMillis);
this.sampleTime = sampleTime;
}
@Override
public String toString() {
StringBuilder sb = new StringBuilder();
sb.append("SampleTime " + this.sampleTime);
sb.append(" CummulativeCpuTime " + this.cumulativeCpuTime);
sb.append(" LastSampleTime " + this.lastSampleTime);
sb.append(" LastCummulativeCpuTime " + this.lastCumulativeCpuTime);
sb.append(" CpuUsage " + this.cpuUsage);
sb.append(" JiffyLengthMillisec " + this.jiffyLengthInMillis);
return sb.toString();
}
}

View File

@ -23,6 +23,7 @@ import java.io.FileInputStream;
import java.io.FileNotFoundException;
import java.io.InputStreamReader;
import java.io.IOException;
import java.math.BigInteger;
import java.nio.charset.Charset;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
@ -41,8 +42,6 @@ public class LinuxResourceCalculatorPlugin extends ResourceCalculatorPlugin {
private static final Log LOG =
LogFactory.getLog(LinuxResourceCalculatorPlugin.class);
public static final int UNAVAILABLE = -1;
/**
* proc's meminfo virtual file has keys-values in the format
* "key:[ \t]*value[ \t]kB".
@ -74,6 +73,7 @@ public class LinuxResourceCalculatorPlugin extends ResourceCalculatorPlugin {
private static final Pattern CPU_TIME_FORMAT =
Pattern.compile("^cpu[ \t]*([0-9]*)" +
"[ \t]*([0-9]*)[ \t]*([0-9]*)[ \t].*");
private CpuTimeTracker cpuTimeTracker;
private String procfsMemFile;
private String procfsCpuFile;
@ -87,12 +87,6 @@ public class LinuxResourceCalculatorPlugin extends ResourceCalculatorPlugin {
private long inactiveSize = 0; // inactive cache memory (kB)
private int numProcessors = 0; // number of processors on the system
private long cpuFrequency = 0L; // CPU frequency on the system (kHz)
private long cumulativeCpuTime = 0L; // CPU used time since system is on (ms)
private long lastCumulativeCpuTime = 0L; // CPU used time read last time (ms)
// Unix timestamp while reading the CPU time (ms)
private float cpuUsage = UNAVAILABLE;
private long sampleTime = UNAVAILABLE;
private long lastSampleTime = UNAVAILABLE;
boolean readMemInfoFile = false;
boolean readCpuInfoFile = false;
@ -106,10 +100,8 @@ public class LinuxResourceCalculatorPlugin extends ResourceCalculatorPlugin {
}
public LinuxResourceCalculatorPlugin() {
procfsMemFile = PROCFS_MEMFILE;
procfsCpuFile = PROCFS_CPUINFO;
procfsStatFile = PROCFS_STAT;
jiffyLengthInMillis = ProcfsBasedProcessTree.JIFFY_LENGTH_IN_MILLIS;
this(PROCFS_MEMFILE, PROCFS_CPUINFO, PROCFS_STAT,
ProcfsBasedProcessTree.JIFFY_LENGTH_IN_MILLIS);
}
/**
@ -128,6 +120,7 @@ public class LinuxResourceCalculatorPlugin extends ResourceCalculatorPlugin {
this.procfsCpuFile = procfsCpuFile;
this.procfsStatFile = procfsStatFile;
this.jiffyLengthInMillis = jiffyLengthInMillis;
this.cpuTimeTracker = new CpuTimeTracker(jiffyLengthInMillis);
}
/**
@ -276,12 +269,13 @@ public class LinuxResourceCalculatorPlugin extends ResourceCalculatorPlugin {
long uTime = Long.parseLong(mat.group(1));
long nTime = Long.parseLong(mat.group(2));
long sTime = Long.parseLong(mat.group(3));
cumulativeCpuTime = uTime + nTime + sTime; // milliseconds
cpuTimeTracker.updateElapsedJiffies(
BigInteger.valueOf(uTime + nTime + sTime),
getCurrentTime());
break;
}
str = in.readLine();
}
cumulativeCpuTime *= jiffyLengthInMillis;
} catch (IOException io) {
LOG.warn("Error reading the stream " + io);
} finally {
@ -345,32 +339,18 @@ public class LinuxResourceCalculatorPlugin extends ResourceCalculatorPlugin {
@Override
public long getCumulativeCpuTime() {
readProcStatFile();
return cumulativeCpuTime;
return cpuTimeTracker.cumulativeCpuTime.longValue();
}
/** {@inheritDoc} */
@Override
public float getCpuUsage() {
readProcStatFile();
sampleTime = getCurrentTime();
if (lastSampleTime == UNAVAILABLE ||
lastSampleTime > sampleTime) {
// lastSampleTime > sampleTime may happen when the system time is changed
lastSampleTime = sampleTime;
lastCumulativeCpuTime = cumulativeCpuTime;
return cpuUsage;
float overallCpuUsage = cpuTimeTracker.getCpuTrackerUsagePercent();
if (overallCpuUsage != CpuTimeTracker.UNAVAILABLE) {
overallCpuUsage = overallCpuUsage / getNumProcessors();
}
// When lastSampleTime is sufficiently old, update cpuUsage.
// Also take a sample of the current time and cumulative CPU time for the
// use of the next calculation.
final long MINIMUM_UPDATE_INTERVAL = 10 * jiffyLengthInMillis;
if (sampleTime > lastSampleTime + MINIMUM_UPDATE_INTERVAL) {
cpuUsage = (float)(cumulativeCpuTime - lastCumulativeCpuTime) * 100F /
((float)(sampleTime - lastSampleTime) * getNumProcessors());
lastSampleTime = sampleTime;
lastCumulativeCpuTime = cumulativeCpuTime;
}
return cpuUsage;
return overallCpuUsage;
}
/**

View File

@ -66,6 +66,8 @@ public class ProcfsBasedProcessTree extends ResourceCalculatorProcessTree {
public static final String PROCFS_CMDLINE_FILE = "cmdline";
public static final long PAGE_SIZE;
public static final long JIFFY_LENGTH_IN_MILLIS; // in millisecond
private final CpuTimeTracker cpuTimeTracker;
private Clock clock;
enum MemInfo {
SIZE("Size"), RSS("Rss"), PSS("Pss"), SHARED_CLEAN("Shared_Clean"),
@ -144,7 +146,7 @@ public class ProcfsBasedProcessTree extends ResourceCalculatorProcessTree {
new HashMap<String, ProcessInfo>();
public ProcfsBasedProcessTree(String pid) {
this(pid, PROCFS);
this(pid, PROCFS, new SystemClock());
}
@Override
@ -157,6 +159,10 @@ public class ProcfsBasedProcessTree extends ResourceCalculatorProcessTree {
}
}
public ProcfsBasedProcessTree(String pid, String procfsDir) {
this(pid, procfsDir, new SystemClock());
}
/**
* Build a new process tree rooted at the pid.
*
@ -165,11 +171,14 @@ public class ProcfsBasedProcessTree extends ResourceCalculatorProcessTree {
*
* @param pid root of the process tree
* @param procfsDir the root of a proc file system - only used for testing.
* @param clock clock for controlling time for testing
*/
public ProcfsBasedProcessTree(String pid, String procfsDir) {
public ProcfsBasedProcessTree(String pid, String procfsDir, Clock clock) {
super(pid);
this.clock = clock;
this.pid = getValidPID(pid);
this.procfsDir = procfsDir;
this.cpuTimeTracker = new CpuTimeTracker(JIFFY_LENGTH_IN_MILLIS);
}
/**
@ -447,6 +456,26 @@ public class ProcfsBasedProcessTree extends ResourceCalculatorProcessTree {
return cpuTime;
}
private BigInteger getTotalProcessJiffies() {
BigInteger totalStime = BigInteger.ZERO;
long totalUtime = 0;
for (ProcessInfo p : processTree.values()) {
if (p != null) {
totalUtime += p.getUtime();
totalStime = totalStime.add(p.getStime());
}
}
return totalStime.add(BigInteger.valueOf(totalUtime));
}
@Override
public float getCpuUsagePercent() {
BigInteger processTotalJiffies = getTotalProcessJiffies();
cpuTimeTracker.updateElapsedJiffies(processTotalJiffies,
clock.getTime());
return cpuTimeTracker.getCpuTrackerUsagePercent();
}
private static String getValidPID(String pid) {
if (pid == null) return deadPid;
Matcher m = numberPattern.matcher(pid);
@ -962,4 +991,48 @@ public class ProcfsBasedProcessTree extends ResourceCalculatorProcessTree {
return sb.toString();
}
}
/**
* Test the {@link ProcfsBasedProcessTree}
*
* @param args
*/
public static void main(String[] args) {
if (args.length != 1) {
System.out.println("Provide <pid of process to monitor>");
return;
}
int numprocessors =
ResourceCalculatorPlugin.getResourceCalculatorPlugin(null, null)
.getNumProcessors();
System.out.println("Number of processors " + numprocessors);
System.out.println("Creating ProcfsBasedProcessTree for process " +
args[0]);
ProcfsBasedProcessTree procfsBasedProcessTree = new
ProcfsBasedProcessTree(args[0]);
procfsBasedProcessTree.updateProcessTree();
System.out.println(procfsBasedProcessTree.getProcessTreeDump());
System.out.println("Get cpu usage " + procfsBasedProcessTree
.getCpuUsagePercent());
try {
// Sleep so we can compute the CPU usage
Thread.sleep(500L);
} catch (InterruptedException e) {
// do nothing
}
procfsBasedProcessTree.updateProcessTree();
System.out.println(procfsBasedProcessTree.getProcessTreeDump());
System.out.println("Cpu usage " + procfsBasedProcessTree
.getCpuUsagePercent());
System.out.println("Vmem usage in bytes " + procfsBasedProcessTree
.getCumulativeVmem());
System.out.println("Rss mem usage in bytes " + procfsBasedProcessTree
.getCumulativeRssmem());
}
}

View File

@ -108,13 +108,23 @@ public abstract class ResourceCalculatorProcessTree extends Configured {
/**
* Get the CPU time in millisecond used by all the processes in the
* process-tree since the process-tree created
* process-tree since the process-tree was created
*
* @return cumulative CPU time in millisecond since the process-tree created
* return 0 if it cannot be calculated
*/
public abstract long getCumulativeCpuTime();
/**
* Get the CPU usage by all the processes in the process-tree based on
* average between samples as a ratio of overall CPU cycles similar to top.
* Thus, if 2 out of 4 cores are used this should return 200.0.
*
* @return percentage CPU usage since the process-tree was created
* return {@link CpuTimeTracker#UNAVAILABLE} if it cannot be calculated
*/
public abstract float getCpuUsagePercent();
/** Verify that the tree process id is same as its process group id.
* @return true if the process id matches else return false.
*/

View File

@ -34,7 +34,7 @@ public class WindowsBasedProcessTree extends ResourceCalculatorProcessTree {
static final Log LOG = LogFactory
.getLog(WindowsBasedProcessTree.class);
static class ProcessInfo {
String pid; // process pid
long vmem; // virtual memory
@ -202,4 +202,9 @@ public class WindowsBasedProcessTree extends ResourceCalculatorProcessTree {
return cpuTimeMs;
}
@Override
public float getCpuUsagePercent() {
return CpuTimeTracker.UNAVAILABLE;
}
}

View File

@ -171,8 +171,8 @@ public class TestLinuxResourceCalculatorPlugin {
updateStatFile(uTime, nTime, sTime);
assertEquals(plugin.getCumulativeCpuTime(),
FAKE_JIFFY_LENGTH * (uTime + nTime + sTime));
assertEquals(plugin.getCpuUsage(), (float)(LinuxResourceCalculatorPlugin.UNAVAILABLE),0.0);
assertEquals(plugin.getCpuUsage(), (float)(CpuTimeTracker.UNAVAILABLE),0.0);
// Advance the time and sample again to test the CPU usage calculation
uTime += 100L;
plugin.advanceTime(200L);

View File

@ -236,8 +236,8 @@ public class TestProcfsBasedProcessTree {
}
protected ProcfsBasedProcessTree createProcessTree(String pid,
String procfsRootDir) {
return new ProcfsBasedProcessTree(pid, procfsRootDir);
String procfsRootDir, Clock clock) {
return new ProcfsBasedProcessTree(pid, procfsRootDir, clock);
}
protected void destroyProcessTree(String pid) throws IOException {
@ -388,6 +388,8 @@ public class TestProcfsBasedProcessTree {
// test processes
String[] pids = { "100", "200", "300", "400" };
ControlledClock testClock = new ControlledClock(new SystemClock());
testClock.setTime(0);
// create the fake procfs root directory.
File procfsRootDir = new File(TEST_ROOT_DIR, "proc");
@ -422,7 +424,7 @@ public class TestProcfsBasedProcessTree {
// crank up the process tree class.
Configuration conf = new Configuration();
ProcfsBasedProcessTree processTree =
createProcessTree("100", procfsRootDir.getAbsolutePath());
createProcessTree("100", procfsRootDir.getAbsolutePath(), testClock);
processTree.setConf(conf);
// build the process tree.
processTree.updateProcessTree();
@ -444,6 +446,12 @@ public class TestProcfsBasedProcessTree {
? 7200L * ProcfsBasedProcessTree.JIFFY_LENGTH_IN_MILLIS : 0L;
Assert.assertEquals("Cumulative cpu time does not match", cumuCpuTime,
processTree.getCumulativeCpuTime());
// verify CPU usage
Assert.assertEquals("Percent CPU time should be set to -1 initially",
-1.0, processTree.getCpuUsagePercent(),
0.01);
// Check by enabling smaps
setSmapsInProceTree(processTree, true);
// RSS=Min(shared_dirty,PSS)+PrivateClean+PrivateDirty (exclude r-xs,
@ -460,15 +468,31 @@ public class TestProcfsBasedProcessTree {
"100", "200000", "200", "3000", "500" });
writeStatFiles(procfsRootDir, pids, procInfos, memInfo);
long elapsedTimeBetweenUpdatesMsec = 200000;
testClock.setTime(elapsedTimeBetweenUpdatesMsec);
// build the process tree.
processTree.updateProcessTree();
// verify cumulative cpu time again
long prevCumuCpuTime = cumuCpuTime;
cumuCpuTime =
ProcfsBasedProcessTree.JIFFY_LENGTH_IN_MILLIS > 0
? 9400L * ProcfsBasedProcessTree.JIFFY_LENGTH_IN_MILLIS : 0L;
Assert.assertEquals("Cumulative cpu time does not match", cumuCpuTime,
processTree.getCumulativeCpuTime());
double expectedCpuUsagePercent =
(ProcfsBasedProcessTree.JIFFY_LENGTH_IN_MILLIS > 0) ?
(cumuCpuTime - prevCumuCpuTime) * 100.0 /
elapsedTimeBetweenUpdatesMsec : 0;
// expectedCpuUsagePercent is given by (94000L - 72000) * 100/
// 200000;
// which in this case is 11. Lets verify that first
Assert.assertEquals(11, expectedCpuUsagePercent, 0.001);
Assert.assertEquals("Percent CPU time is not correct expected " +
expectedCpuUsagePercent, expectedCpuUsagePercent,
processTree.getCpuUsagePercent(),
0.01);
} finally {
FileUtil.fullyDelete(procfsRootDir);
}
@ -535,7 +559,8 @@ public class TestProcfsBasedProcessTree {
// crank up the process tree class.
ProcfsBasedProcessTree processTree =
createProcessTree("100", procfsRootDir.getAbsolutePath());
createProcessTree("100", procfsRootDir.getAbsolutePath(),
new SystemClock());
setSmapsInProceTree(processTree, smapEnabled);
// verify cumulative memory
@ -672,7 +697,7 @@ public class TestProcfsBasedProcessTree {
setupProcfsRootDir(procfsRootDir);
// crank up the process tree class.
createProcessTree(pid, procfsRootDir.getAbsolutePath());
createProcessTree(pid, procfsRootDir.getAbsolutePath(), new SystemClock());
// Let us not create stat file for pid 100.
Assert.assertTrue(ProcfsBasedProcessTree.checkPidPgrpidForMatch(pid,
@ -741,7 +766,8 @@ public class TestProcfsBasedProcessTree {
writeCmdLineFiles(procfsRootDir, pids, cmdLines);
ProcfsBasedProcessTree processTree =
createProcessTree("100", procfsRootDir.getAbsolutePath());
createProcessTree("100", procfsRootDir.getAbsolutePath(),
new SystemClock());
// build the process tree.
processTree.updateProcessTree();

View File

@ -53,6 +53,11 @@ public class TestResourceCalculatorProcessTree {
return 0;
}
@Override
public float getCpuUsagePercent() {
return CpuTimeTracker.UNAVAILABLE;
}
public boolean checkPidPgrpidForMatch() {
return false;
}

View File

@ -42,14 +42,29 @@ import static org.apache.hadoop.metrics2.lib.Interns.info;
@Metrics(context="container")
public class ContainerMetrics implements MetricsSource {
public static final String PMEM_LIMIT_METRIC_NAME = "pMemLimit";
public static final String VMEM_LIMIT_METRIC_NAME = "vMemLimit";
public static final String PMEM_LIMIT_METRIC_NAME = "pMemLimitMBs";
public static final String VMEM_LIMIT_METRIC_NAME = "vMemLimitMBs";
public static final String VCORE_LIMIT_METRIC_NAME = "vCoreLimit";
public static final String PMEM_USAGE_METRIC_NAME = "pMemUsage";
public static final String PMEM_USAGE_METRIC_NAME = "pMemUsageMBs";
private static final String PHY_CPU_USAGE_METRIC_NAME = "pCpuUsagePercent";
// Use a multiplier of 1000 to avoid losing too much precision when
// converting to integers
private static final String VCORE_USAGE_METRIC_NAME = "milliVcoreUsage";
@Metric
public MutableStat pMemMBsStat;
// This tracks overall CPU percentage of the machine in terms of percentage
// of 1 core similar to top
// Thus if you use 2 cores completely out of 4 available cores this value
// will be 200
@Metric
public MutableStat cpuCoreUsagePercent;
@Metric
public MutableStat milliVcoresUsed;
@Metric
public MutableGaugeInt pMemLimitMbs;
@ -57,7 +72,7 @@ public class ContainerMetrics implements MetricsSource {
public MutableGaugeInt vMemLimitMbs;
@Metric
public MutableGaugeInt cpuVcores;
public MutableGaugeInt cpuVcoreLimit;
static final MetricsInfo RECORD_INFO =
info("ContainerResource", "Resource limit and usage by container");
@ -95,11 +110,17 @@ public class ContainerMetrics implements MetricsSource {
this.pMemMBsStat = registry.newStat(
PMEM_USAGE_METRIC_NAME, "Physical memory stats", "Usage", "MBs", true);
this.cpuCoreUsagePercent = registry.newStat(
PHY_CPU_USAGE_METRIC_NAME, "Physical Cpu core percent usage stats",
"Usage", "Percents", true);
this.milliVcoresUsed = registry.newStat(
VCORE_USAGE_METRIC_NAME, "1000 times Vcore usage", "Usage",
"MilliVcores", true);
this.pMemLimitMbs = registry.newGauge(
PMEM_LIMIT_METRIC_NAME, "Physical memory limit in MBs", 0);
this.vMemLimitMbs = registry.newGauge(
VMEM_LIMIT_METRIC_NAME, "Virtual memory limit in MBs", 0);
this.cpuVcores = registry.newGauge(
this.cpuVcoreLimit = registry.newGauge(
VCORE_LIMIT_METRIC_NAME, "CPU limit in number of vcores", 0);
}
@ -170,6 +191,12 @@ public class ContainerMetrics implements MetricsSource {
this.pMemMBsStat.add(memoryMBs);
}
public void recordCpuUsage(
int totalPhysicalCpuPercent, int milliVcoresUsed) {
this.cpuCoreUsagePercent.add(totalPhysicalCpuPercent);
this.milliVcoresUsed.add(milliVcoresUsed);
}
public void recordProcessId(String processId) {
registry.tag(PROCESSID_INFO, processId);
}
@ -177,7 +204,7 @@ public class ContainerMetrics implements MetricsSource {
public void recordResourceLimit(int vmemLimit, int pmemLimit, int cpuVcores) {
this.vMemLimitMbs.set(vmemLimit);
this.pMemLimitMbs.set(pmemLimit);
this.cpuVcores.set(cpuVcores);
this.cpuVcoreLimit.set(cpuVcores);
}
private synchronized void scheduleTimerTaskIfRequired() {

View File

@ -38,6 +38,7 @@ import org.apache.hadoop.yarn.event.Dispatcher;
import org.apache.hadoop.yarn.server.nodemanager.ContainerExecutor;
import org.apache.hadoop.yarn.server.nodemanager.Context;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerKillEvent;
import org.apache.hadoop.yarn.server.nodemanager.util.NodeManagerHardwareUtils;
import org.apache.hadoop.yarn.util.ResourceCalculatorProcessTree;
import org.apache.hadoop.yarn.util.ResourceCalculatorPlugin;
@ -75,6 +76,7 @@ public class ContainersMonitorImpl extends AbstractService implements
private long maxVCoresAllottedForContainers;
private static final long UNKNOWN_MEMORY_LIMIT = -1L;
private int nodeCpuPercentageForYARN;
public ContainersMonitorImpl(ContainerExecutor exec,
AsyncDispatcher dispatcher, Context context) {
@ -145,6 +147,9 @@ public class ContainersMonitorImpl extends AbstractService implements
LOG.info("Physical memory check enabled: " + pmemCheckEnabled);
LOG.info("Virtual memory check enabled: " + vmemCheckEnabled);
nodeCpuPercentageForYARN =
NodeManagerHardwareUtils.getNodeCpuPercentage(conf);
if (pmemCheckEnabled) {
// Logging if actual pmem cannot be determined.
long totalPhysicalMemoryOnNM = UNKNOWN_MEMORY_LIMIT;
@ -434,6 +439,16 @@ public class ContainersMonitorImpl extends AbstractService implements
pTree.updateProcessTree(); // update process-tree
long currentVmemUsage = pTree.getCumulativeVmem();
long currentPmemUsage = pTree.getCumulativeRssmem();
// if machine has 6 cores and 3 are used,
// cpuUsagePercentPerCore should be 300% and
// cpuUsageTotalCoresPercentage should be 50%
float cpuUsagePercentPerCore = pTree.getCpuUsagePercent();
float cpuUsageTotalCoresPercentage = cpuUsagePercentPerCore /
resourceCalculatorPlugin.getNumProcessors();
// Multiply by 1000 to avoid losing data when converting to int
int milliVcoresUsed = (int) (cpuUsageTotalCoresPercentage * 1000
* maxVCoresAllottedForContainers /nodeCpuPercentageForYARN);
// as processes begin with an age 1, we want to see if there
// are processes more than 1 iteration old.
long curMemUsageOfAgedProcesses = pTree.getCumulativeVmem(1);
@ -451,6 +466,9 @@ public class ContainersMonitorImpl extends AbstractService implements
ContainerMetrics.forContainer(
containerId, containerMetricsPeriodMs).recordMemoryUsage(
(int) (currentPmemUsage >> 20));
ContainerMetrics.forContainer(
containerId, containerMetricsPeriodMs).recordCpuUsage
((int)cpuUsagePercentPerCore, milliVcoresUsed);
}
boolean isMemoryOverLimit = false;

View File

@ -59,6 +59,19 @@ public class NodeManagerHardwareUtils {
public static float getContainersCores(ResourceCalculatorPlugin plugin,
Configuration conf) {
int numProcessors = plugin.getNumProcessors();
int nodeCpuPercentage = getNodeCpuPercentage(conf);
return (nodeCpuPercentage * numProcessors) / 100.0f;
}
/**
* Gets the percentage of physical CPU that is configured for YARN containers
* This is percent > 0 and <= 100 based on
* YarnConfiguration.NM_RESOURCE_PERCENTAGE_PHYSICAL_CPU_LIMIT
* @param conf Configuration object
* @return percent > 0 and <= 100
*/
public static int getNodeCpuPercentage(Configuration conf) {
int nodeCpuPercentage =
Math.min(conf.getInt(
YarnConfiguration.NM_RESOURCE_PERCENTAGE_PHYSICAL_CPU_LIMIT,
@ -73,7 +86,6 @@ public class NodeManagerHardwareUtils {
+ ". Value cannot be less than or equal to 0.";
throw new IllegalArgumentException(message);
}
return (nodeCpuPercentage * numProcessors) / 100.0f;
return nodeCpuPercentage;
}
}