YARN-3304. Cleaning up ResourceCalculatorProcessTree APIs for public use and removing inconsistencies in the default values. Contributed by Junping Du and Karthik Kambatla.

(cherry picked from commit c358368f51)
This commit is contained in:
Vinod Kumar Vavilapalli 2015-03-30 10:09:40 -07:00
parent a0ed29a058
commit 35af6f1802
11 changed files with 187 additions and 154 deletions

View File

@ -170,7 +170,7 @@ abstract public class Task implements Writable, Configurable {
skipRanges.skipRangeIterator(); skipRanges.skipRangeIterator();
private ResourceCalculatorProcessTree pTree; private ResourceCalculatorProcessTree pTree;
private long initCpuCumulativeTime = 0; private long initCpuCumulativeTime = ResourceCalculatorProcessTree.UNAVAILABLE;
protected JobConf conf; protected JobConf conf;
protected MapOutputFile mapOutputFile; protected MapOutputFile mapOutputFile;
@ -844,13 +844,25 @@ abstract public class Task implements Writable, Configurable {
} }
pTree.updateProcessTree(); pTree.updateProcessTree();
long cpuTime = pTree.getCumulativeCpuTime(); long cpuTime = pTree.getCumulativeCpuTime();
long pMem = pTree.getCumulativeRssmem(); long pMem = pTree.getRssMemorySize();
long vMem = pTree.getCumulativeVmem(); long vMem = pTree.getVirtualMemorySize();
// Remove the CPU time consumed previously by JVM reuse // Remove the CPU time consumed previously by JVM reuse
cpuTime -= initCpuCumulativeTime; if (cpuTime != ResourceCalculatorProcessTree.UNAVAILABLE &&
counters.findCounter(TaskCounter.CPU_MILLISECONDS).setValue(cpuTime); initCpuCumulativeTime != ResourceCalculatorProcessTree.UNAVAILABLE) {
counters.findCounter(TaskCounter.PHYSICAL_MEMORY_BYTES).setValue(pMem); cpuTime -= initCpuCumulativeTime;
counters.findCounter(TaskCounter.VIRTUAL_MEMORY_BYTES).setValue(vMem); }
if (cpuTime != ResourceCalculatorProcessTree.UNAVAILABLE) {
counters.findCounter(TaskCounter.CPU_MILLISECONDS).setValue(cpuTime);
}
if (pMem != ResourceCalculatorProcessTree.UNAVAILABLE) {
counters.findCounter(TaskCounter.PHYSICAL_MEMORY_BYTES).setValue(pMem);
}
if (vMem != ResourceCalculatorProcessTree.UNAVAILABLE) {
counters.findCounter(TaskCounter.VIRTUAL_MEMORY_BYTES).setValue(vMem);
}
} }
/** /**

View File

@ -741,6 +741,10 @@ Release 2.7.0 - UNRELEASED
YARN-2213. Change proxy-user cookie log in AmIpFilter to DEBUG. YARN-2213. Change proxy-user cookie log in AmIpFilter to DEBUG.
(Varun Saxena via xgong) (Varun Saxena via xgong)
YARN-3304. Cleaning up ResourceCalculatorProcessTree APIs for public use and
removing inconsistencies in the default values. (Junping Du and Karthik
Kambatla via vinodkv)
Release 2.6.0 - 2014-11-18 Release 2.6.0 - 2014-11-18
INCOMPATIBLE CHANGES INCOMPATIBLE CHANGES

View File

@ -26,7 +26,8 @@ import java.math.BigInteger;
@InterfaceAudience.Private @InterfaceAudience.Private
@InterfaceStability.Unstable @InterfaceStability.Unstable
public class CpuTimeTracker { public class CpuTimeTracker {
public static final int UNAVAILABLE = -1; public static final int UNAVAILABLE =
ResourceCalculatorProcessTree.UNAVAILABLE;
final long MINIMUM_UPDATE_INTERVAL; final long MINIMUM_UPDATE_INTERVAL;
// CPU used time since system is on (ms) // CPU used time since system is on (ms)

View File

@ -140,7 +140,7 @@ public class ProcfsBasedProcessTree extends ResourceCalculatorProcessTree {
static private String deadPid = "-1"; static private String deadPid = "-1";
private String pid = deadPid; private String pid = deadPid;
static private Pattern numberPattern = Pattern.compile("[1-9][0-9]*"); static private Pattern numberPattern = Pattern.compile("[1-9][0-9]*");
private Long cpuTime = 0L; private long cpuTime = UNAVAILABLE;
protected Map<String, ProcessInfo> processTree = protected Map<String, ProcessInfo> processTree =
new HashMap<String, ProcessInfo>(); new HashMap<String, ProcessInfo>();
@ -340,66 +340,53 @@ public class ProcfsBasedProcessTree extends ResourceCalculatorProcessTree {
return ret.toString(); return ret.toString();
} }
/**
* Get the cumulative virtual memory used by all the processes in the
* process-tree that are older than the passed in age.
*
* @param olderThanAge processes above this age are included in the
* memory addition
* @return cumulative virtual memory used by the process-tree in bytes,
* for processes older than this age.
*/
@Override @Override
public long getCumulativeVmem(int olderThanAge) { public long getVirtualMemorySize(int olderThanAge) {
long total = 0; long total = UNAVAILABLE;
for (ProcessInfo p : processTree.values()) { for (ProcessInfo p : processTree.values()) {
if ((p != null) && (p.getAge() > olderThanAge)) { if ((p != null) && (p.getAge() > olderThanAge)) {
if (total == UNAVAILABLE ) {
total = 0;
}
total += p.getVmem(); total += p.getVmem();
} }
} }
return total; return total;
} }
/**
* Get the cumulative resident set size (rss) memory used by all the processes
* in the process-tree that are older than the passed in age.
*
* @param olderThanAge processes above this age are included in the
* memory addition
* @return cumulative rss memory used by the process-tree in bytes,
* for processes older than this age. return 0 if it cannot be
* calculated
*/
@Override @Override
public long getCumulativeRssmem(int olderThanAge) { public long getRssMemorySize(int olderThanAge) {
if (PAGE_SIZE < 0) { if (PAGE_SIZE < 0) {
return 0; return UNAVAILABLE;
} }
if (smapsEnabled) { if (smapsEnabled) {
return getSmapBasedCumulativeRssmem(olderThanAge); return getSmapBasedRssMemorySize(olderThanAge);
} }
boolean isAvailable = false;
long totalPages = 0; long totalPages = 0;
for (ProcessInfo p : processTree.values()) { for (ProcessInfo p : processTree.values()) {
if ((p != null) && (p.getAge() > olderThanAge)) { if ((p != null) && (p.getAge() > olderThanAge)) {
totalPages += p.getRssmemPage(); totalPages += p.getRssmemPage();
isAvailable = true;
} }
} }
return totalPages * PAGE_SIZE; // convert # pages to byte return isAvailable ? totalPages * PAGE_SIZE : UNAVAILABLE; // convert # pages to byte
} }
/** /**
* Get the cumulative resident set size (RSS) memory used by all the processes * Get the resident set size (RSS) memory used by all the processes
* in the process-tree that are older than the passed in age. RSS is * in the process-tree that are older than the passed in age. RSS is
* calculated based on SMAP information. Skip mappings with "r--s", "r-xs" * calculated based on SMAP information. Skip mappings with "r--s", "r-xs"
* permissions to get real RSS usage of the process. * permissions to get real RSS usage of the process.
* *
* @param olderThanAge * @param olderThanAge
* processes above this age are included in the memory addition * processes above this age are included in the memory addition
* @return cumulative rss memory used by the process-tree in bytes, for * @return rss memory used by the process-tree in bytes, for
* processes older than this age. return 0 if it cannot be calculated * processes older than this age. return {@link #UNAVAILABLE} if it cannot
* be calculated.
*/ */
private long getSmapBasedCumulativeRssmem(int olderThanAge) { private long getSmapBasedRssMemorySize(int olderThanAge) {
long total = 0; long total = UNAVAILABLE;
for (ProcessInfo p : processTree.values()) { for (ProcessInfo p : processTree.values()) {
if ((p != null) && (p.getAge() > olderThanAge)) { if ((p != null) && (p.getAge() > olderThanAge)) {
ProcessTreeSmapMemInfo procMemInfo = processSMAPTree.get(p.getPid()); ProcessTreeSmapMemInfo procMemInfo = processSMAPTree.get(p.getPid());
@ -412,6 +399,9 @@ public class ProcfsBasedProcessTree extends ResourceCalculatorProcessTree {
.equalsIgnoreCase(READ_EXECUTE_WITH_SHARED_PERMISSION)) { .equalsIgnoreCase(READ_EXECUTE_WITH_SHARED_PERMISSION)) {
continue; continue;
} }
if (total == UNAVAILABLE){
total = 0;
}
total += total +=
Math.min(info.sharedDirty, info.pss) + info.privateDirty Math.min(info.sharedDirty, info.pss) + info.privateDirty
+ info.privateClean; + info.privateClean;
@ -429,30 +419,34 @@ public class ProcfsBasedProcessTree extends ResourceCalculatorProcessTree {
} }
} }
} }
total = (total * KB_TO_BYTES); // convert to bytes if (total > 0) {
total *= KB_TO_BYTES; // convert to bytes
}
LOG.info("SmapBasedCumulativeRssmem (bytes) : " + total); LOG.info("SmapBasedCumulativeRssmem (bytes) : " + total);
return total; // size return total; // size
} }
/**
* Get the CPU time in millisecond used by all the processes in the
* process-tree since the process-tree created
*
* @return cumulative CPU time in millisecond since the process-tree created
* return 0 if it cannot be calculated
*/
@Override @Override
public long getCumulativeCpuTime() { public long getCumulativeCpuTime() {
if (JIFFY_LENGTH_IN_MILLIS < 0) { if (JIFFY_LENGTH_IN_MILLIS < 0) {
return 0; return UNAVAILABLE;
} }
long incJiffies = 0; long incJiffies = 0;
boolean isAvailable = false;
for (ProcessInfo p : processTree.values()) { for (ProcessInfo p : processTree.values()) {
if (p != null) { if (p != null) {
incJiffies += p.getDtime(); incJiffies += p.getDtime();
// data is available
isAvailable = true;
} }
} }
cpuTime += incJiffies * JIFFY_LENGTH_IN_MILLIS; if (isAvailable) {
// reset cpuTime to 0 instead of UNAVAILABLE
if (cpuTime == UNAVAILABLE) {
cpuTime = 0L;
}
cpuTime += incJiffies * JIFFY_LENGTH_IN_MILLIS;
}
return cpuTime; return cpuTime;
} }
@ -1031,8 +1025,8 @@ public class ProcfsBasedProcessTree extends ResourceCalculatorProcessTree {
System.out.println("Cpu usage " + procfsBasedProcessTree System.out.println("Cpu usage " + procfsBasedProcessTree
.getCpuUsagePercent()); .getCpuUsagePercent());
System.out.println("Vmem usage in bytes " + procfsBasedProcessTree System.out.println("Vmem usage in bytes " + procfsBasedProcessTree
.getCumulativeVmem()); .getVirtualMemorySize());
System.out.println("Rss mem usage in bytes " + procfsBasedProcessTree System.out.println("Rss mem usage in bytes " + procfsBasedProcessTree
.getCumulativeRssmem()); .getRssMemorySize());
} }
} }

View File

@ -23,19 +23,23 @@ import java.lang.reflect.Constructor;
import org.apache.commons.logging.Log; import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory; import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.classification.InterfaceAudience.Public; import org.apache.hadoop.classification.InterfaceAudience.Public;
import org.apache.hadoop.classification.InterfaceAudience.Private;
import org.apache.hadoop.classification.InterfaceStability.Evolving; import org.apache.hadoop.classification.InterfaceStability.Evolving;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured; import org.apache.hadoop.conf.Configured;
/** /**
* Interface class to obtain process resource usage * Interface class to obtain process resource usage
* * NOTE: This class should not be used by external users, but only by external
* developers to extend and include their own process-tree implementation,
* especially for platforms other than Linux and Windows.
*/ */
@Public @Public
@Evolving @Evolving
public abstract class ResourceCalculatorProcessTree extends Configured { public abstract class ResourceCalculatorProcessTree extends Configured {
static final Log LOG = LogFactory static final Log LOG = LogFactory
.getLog(ResourceCalculatorProcessTree.class); .getLog(ResourceCalculatorProcessTree.class);
public static final int UNAVAILABLE = -1;
/** /**
* Create process-tree instance with specified root process. * Create process-tree instance with specified root process.
@ -65,63 +69,64 @@ public abstract class ResourceCalculatorProcessTree extends Configured {
public abstract String getProcessTreeDump(); public abstract String getProcessTreeDump();
/** /**
* Get the cumulative virtual memory used by all the processes in the * Get the virtual memory used by all the processes in the
* process-tree. * process-tree.
* *
* @return cumulative virtual memory used by the process-tree in bytes. * @return virtual memory used by the process-tree in bytes,
* {@link #UNAVAILABLE} if it cannot be calculated.
*/ */
public long getCumulativeVmem() { public long getVirtualMemorySize() {
return getCumulativeVmem(0); return getVirtualMemorySize(0);
} }
/** /**
* Get the cumulative resident set size (rss) memory used by all the processes * Get the resident set size (rss) memory used by all the processes
* in the process-tree. * in the process-tree.
* *
* @return cumulative rss memory used by the process-tree in bytes. return 0 * @return rss memory used by the process-tree in bytes,
* if it cannot be calculated * {@link #UNAVAILABLE} if it cannot be calculated.
*/ */
public long getCumulativeRssmem() { public long getRssMemorySize() {
return getCumulativeRssmem(0); return getRssMemorySize(0);
} }
/** /**
* Get the cumulative virtual memory used by all the processes in the * Get the virtual memory used by all the processes in the
* process-tree that are older than the passed in age. * process-tree that are older than the passed in age.
* *
* @param olderThanAge processes above this age are included in the * @param olderThanAge processes above this age are included in the
* memory addition * memory addition
* @return cumulative virtual memory used by the process-tree in bytes, * @return virtual memory used by the process-tree in bytes for
* for processes older than this age. return 0 if it cannot be * processes older than the specified age, {@link #UNAVAILABLE} if it
* calculated * cannot be calculated.
*/ */
public long getCumulativeVmem(int olderThanAge) { public long getVirtualMemorySize(int olderThanAge) {
return 0; return UNAVAILABLE;
} }
/** /**
* Get the cumulative resident set size (rss) memory used by all the processes * Get the resident set size (rss) memory used by all the processes
* in the process-tree that are older than the passed in age. * in the process-tree that are older than the passed in age.
* *
* @param olderThanAge processes above this age are included in the * @param olderThanAge processes above this age are included in the
* memory addition * memory addition
* @return cumulative rss memory used by the process-tree in bytes, * @return rss memory used by the process-tree in bytes for
* for processes older than this age. return 0 if it cannot be * processes older than specified age, {@link #UNAVAILABLE} if it cannot be
* calculated * calculated.
*/ */
public long getCumulativeRssmem(int olderThanAge) { public long getRssMemorySize(int olderThanAge) {
return 0; return UNAVAILABLE;
} }
/** /**
* Get the CPU time in millisecond used by all the processes in the * Get the CPU time in millisecond used by all the processes in the
* process-tree since the process-tree was created * process-tree since the process-tree was created
* *
* @return cumulative CPU time in millisecond since the process-tree created * @return cumulative CPU time in millisecond since the process-tree
* return 0 if it cannot be calculated * created, {@link #UNAVAILABLE} if it cannot be calculated.
*/ */
public long getCumulativeCpuTime() { public long getCumulativeCpuTime() {
return 0; return UNAVAILABLE;
} }
/** /**
@ -129,11 +134,11 @@ public abstract class ResourceCalculatorProcessTree extends Configured {
* average between samples as a ratio of overall CPU cycles similar to top. * average between samples as a ratio of overall CPU cycles similar to top.
* Thus, if 2 out of 4 cores are used this should return 200.0. * Thus, if 2 out of 4 cores are used this should return 200.0.
* *
* @return percentage CPU usage since the process-tree was created * @return percentage CPU usage since the process-tree was created,
* return {@link CpuTimeTracker#UNAVAILABLE} if it cannot be calculated * {@link #UNAVAILABLE} if it cannot be calculated.
*/ */
public float getCpuUsagePercent() { public float getCpuUsagePercent() {
return -1; return UNAVAILABLE;
} }
/** Verify that the tree process id is same as its process group id. /** Verify that the tree process id is same as its process group id.
@ -153,6 +158,7 @@ public abstract class ResourceCalculatorProcessTree extends Configured {
* @return ResourceCalculatorProcessTree or null if ResourceCalculatorPluginTree * @return ResourceCalculatorProcessTree or null if ResourceCalculatorPluginTree
* is not available for this system. * is not available for this system.
*/ */
@Private
public static ResourceCalculatorProcessTree getResourceCalculatorProcessTree( public static ResourceCalculatorProcessTree getResourceCalculatorProcessTree(
String pid, Class<? extends ResourceCalculatorProcessTree> clazz, Configuration conf) { String pid, Class<? extends ResourceCalculatorProcessTree> clazz, Configuration conf) {

View File

@ -45,7 +45,7 @@ public class WindowsBasedProcessTree extends ResourceCalculatorProcessTree {
} }
private String taskProcessId = null; private String taskProcessId = null;
private long cpuTimeMs = 0; private long cpuTimeMs = UNAVAILABLE;
private Map<String, ProcessInfo> processTree = private Map<String, ProcessInfo> processTree =
new HashMap<String, ProcessInfo>(); new HashMap<String, ProcessInfo>();
@ -173,10 +173,13 @@ public class WindowsBasedProcessTree extends ResourceCalculatorProcessTree {
} }
@Override @Override
public long getCumulativeVmem(int olderThanAge) { public long getVirtualMemorySize(int olderThanAge) {
long total = 0; long total = UNAVAILABLE;
for (ProcessInfo p : processTree.values()) { for (ProcessInfo p : processTree.values()) {
if ((p != null) && (p.age > olderThanAge)) { if ((p != null) && (p.age > olderThanAge)) {
if (total == UNAVAILABLE) {
total = 0;
}
total += p.vmem; total += p.vmem;
} }
} }
@ -184,10 +187,13 @@ public class WindowsBasedProcessTree extends ResourceCalculatorProcessTree {
} }
@Override @Override
public long getCumulativeRssmem(int olderThanAge) { public long getRssMemorySize(int olderThanAge) {
long total = 0; long total = UNAVAILABLE;
for (ProcessInfo p : processTree.values()) { for (ProcessInfo p : processTree.values()) {
if ((p != null) && (p.age > olderThanAge)) { if ((p != null) && (p.age > olderThanAge)) {
if (total == UNAVAILABLE) {
total = 0;
}
total += p.workingSet; total += p.workingSet;
} }
} }
@ -197,6 +203,9 @@ public class WindowsBasedProcessTree extends ResourceCalculatorProcessTree {
@Override @Override
public long getCumulativeCpuTime() { public long getCumulativeCpuTime() {
for (ProcessInfo p : processTree.values()) { for (ProcessInfo p : processTree.values()) {
if (cpuTimeMs == UNAVAILABLE) {
cpuTimeMs = 0;
}
cpuTimeMs += p.cpuTimeMsDelta; cpuTimeMs += p.cpuTimeMsDelta;
} }
return cpuTimeMs; return cpuTimeMs;

View File

@ -19,6 +19,7 @@
package org.apache.hadoop.yarn.util; package org.apache.hadoop.yarn.util;
import static org.apache.hadoop.yarn.util.ProcfsBasedProcessTree.KB_TO_BYTES; import static org.apache.hadoop.yarn.util.ProcfsBasedProcessTree.KB_TO_BYTES;
import static org.apache.hadoop.yarn.util.ResourceCalculatorProcessTree.UNAVAILABLE;
import static org.junit.Assert.fail; import static org.junit.Assert.fail;
import static org.junit.Assume.assumeTrue; import static org.junit.Assume.assumeTrue;
@ -226,8 +227,8 @@ public class TestProcfsBasedProcessTree {
p.updateProcessTree(); p.updateProcessTree();
Assert.assertFalse("ProcessTree must have been gone", isAlive(pid)); Assert.assertFalse("ProcessTree must have been gone", isAlive(pid));
Assert.assertTrue( Assert.assertTrue(
"Cumulative vmem for the gone-process is " + p.getCumulativeVmem() "vmem for the gone-process is " + p.getVirtualMemorySize()
+ " . It should be zero.", p.getCumulativeVmem() == 0); + " . It should be zero.", p.getVirtualMemorySize() == 0);
Assert.assertTrue(p.toString().equals("[ ]")); Assert.assertTrue(p.toString().equals("[ ]"));
} }
@ -429,16 +430,16 @@ public class TestProcfsBasedProcessTree {
// build the process tree. // build the process tree.
processTree.updateProcessTree(); processTree.updateProcessTree();
// verify cumulative memory // verify virtual memory
Assert.assertEquals("Cumulative virtual memory does not match", 600000L, Assert.assertEquals("Virtual memory does not match", 600000L,
processTree.getCumulativeVmem()); processTree.getVirtualMemorySize());
// verify rss memory // verify rss memory
long cumuRssMem = long cumuRssMem =
ProcfsBasedProcessTree.PAGE_SIZE > 0 ProcfsBasedProcessTree.PAGE_SIZE > 0
? 600L * ProcfsBasedProcessTree.PAGE_SIZE : 0L; ? 600L * ProcfsBasedProcessTree.PAGE_SIZE : 0L;
Assert.assertEquals("Cumulative rss memory does not match", cumuRssMem, Assert.assertEquals("rss memory does not match", cumuRssMem,
processTree.getCumulativeRssmem()); processTree.getRssMemorySize());
// verify cumulative cpu time // verify cumulative cpu time
long cumuCpuTime = long cumuCpuTime =
@ -456,8 +457,8 @@ public class TestProcfsBasedProcessTree {
setSmapsInProceTree(processTree, true); setSmapsInProceTree(processTree, true);
// RSS=Min(shared_dirty,PSS)+PrivateClean+PrivateDirty (exclude r-xs, // RSS=Min(shared_dirty,PSS)+PrivateClean+PrivateDirty (exclude r-xs,
// r--s) // r--s)
Assert.assertEquals("Cumulative rss memory does not match", Assert.assertEquals("rss memory does not match",
(100 * KB_TO_BYTES * 3), processTree.getCumulativeRssmem()); (100 * KB_TO_BYTES * 3), processTree.getRssMemorySize());
// test the cpu time again to see if it cumulates // test the cpu time again to see if it cumulates
procInfos[0] = procInfos[0] =
@ -563,9 +564,9 @@ public class TestProcfsBasedProcessTree {
new SystemClock()); new SystemClock());
setSmapsInProceTree(processTree, smapEnabled); setSmapsInProceTree(processTree, smapEnabled);
// verify cumulative memory // verify virtual memory
Assert.assertEquals("Cumulative memory does not match", 700000L, Assert.assertEquals("Cumulative memory does not match", 700000L,
processTree.getCumulativeVmem()); processTree.getVirtualMemorySize());
// write one more process as child of 100. // write one more process as child of 100.
String[] newPids = { "500" }; String[] newPids = { "500" };
setupPidDirs(procfsRootDir, newPids); setupPidDirs(procfsRootDir, newPids);
@ -581,34 +582,34 @@ public class TestProcfsBasedProcessTree {
// check memory includes the new process. // check memory includes the new process.
processTree.updateProcessTree(); processTree.updateProcessTree();
Assert.assertEquals("Cumulative vmem does not include new process", Assert.assertEquals("vmem does not include new process",
1200000L, processTree.getCumulativeVmem()); 1200000L, processTree.getVirtualMemorySize());
if (!smapEnabled) { if (!smapEnabled) {
long cumuRssMem = long cumuRssMem =
ProcfsBasedProcessTree.PAGE_SIZE > 0 ProcfsBasedProcessTree.PAGE_SIZE > 0
? 1200L * ProcfsBasedProcessTree.PAGE_SIZE : 0L; ? 1200L * ProcfsBasedProcessTree.PAGE_SIZE : 0L;
Assert.assertEquals("Cumulative rssmem does not include new process", Assert.assertEquals("rssmem does not include new process",
cumuRssMem, processTree.getCumulativeRssmem()); cumuRssMem, processTree.getRssMemorySize());
} else { } else {
Assert.assertEquals("Cumulative rssmem does not include new process", Assert.assertEquals("rssmem does not include new process",
100 * KB_TO_BYTES * 4, processTree.getCumulativeRssmem()); 100 * KB_TO_BYTES * 4, processTree.getRssMemorySize());
} }
// however processes older than 1 iteration will retain the older value // however processes older than 1 iteration will retain the older value
Assert.assertEquals( Assert.assertEquals(
"Cumulative vmem shouldn't have included new process", 700000L, "vmem shouldn't have included new process", 700000L,
processTree.getCumulativeVmem(1)); processTree.getVirtualMemorySize(1));
if (!smapEnabled) { if (!smapEnabled) {
long cumuRssMem = long cumuRssMem =
ProcfsBasedProcessTree.PAGE_SIZE > 0 ProcfsBasedProcessTree.PAGE_SIZE > 0
? 700L * ProcfsBasedProcessTree.PAGE_SIZE : 0L; ? 700L * ProcfsBasedProcessTree.PAGE_SIZE : 0L;
Assert.assertEquals( Assert.assertEquals(
"Cumulative rssmem shouldn't have included new process", cumuRssMem, "rssmem shouldn't have included new process", cumuRssMem,
processTree.getCumulativeRssmem(1)); processTree.getRssMemorySize(1));
} else { } else {
Assert.assertEquals( Assert.assertEquals(
"Cumulative rssmem shouldn't have included new process", "rssmem shouldn't have included new process",
100 * KB_TO_BYTES * 3, processTree.getCumulativeRssmem(1)); 100 * KB_TO_BYTES * 3, processTree.getRssMemorySize(1));
} }
// one more process // one more process
@ -629,49 +630,49 @@ public class TestProcfsBasedProcessTree {
// processes older than 2 iterations should be same as before. // processes older than 2 iterations should be same as before.
Assert.assertEquals( Assert.assertEquals(
"Cumulative vmem shouldn't have included new processes", 700000L, "vmem shouldn't have included new processes", 700000L,
processTree.getCumulativeVmem(2)); processTree.getVirtualMemorySize(2));
if (!smapEnabled) { if (!smapEnabled) {
long cumuRssMem = long cumuRssMem =
ProcfsBasedProcessTree.PAGE_SIZE > 0 ProcfsBasedProcessTree.PAGE_SIZE > 0
? 700L * ProcfsBasedProcessTree.PAGE_SIZE : 0L; ? 700L * ProcfsBasedProcessTree.PAGE_SIZE : 0L;
Assert.assertEquals( Assert.assertEquals(
"Cumulative rssmem shouldn't have included new processes", "rssmem shouldn't have included new processes",
cumuRssMem, processTree.getCumulativeRssmem(2)); cumuRssMem, processTree.getRssMemorySize(2));
} else { } else {
Assert.assertEquals( Assert.assertEquals(
"Cumulative rssmem shouldn't have included new processes", "rssmem shouldn't have included new processes",
100 * KB_TO_BYTES * 3, processTree.getCumulativeRssmem(2)); 100 * KB_TO_BYTES * 3, processTree.getRssMemorySize(2));
} }
// processes older than 1 iteration should not include new process, // processes older than 1 iteration should not include new process,
// but include process 500 // but include process 500
Assert.assertEquals( Assert.assertEquals(
"Cumulative vmem shouldn't have included new processes", 1200000L, "vmem shouldn't have included new processes", 1200000L,
processTree.getCumulativeVmem(1)); processTree.getVirtualMemorySize(1));
if (!smapEnabled) { if (!smapEnabled) {
long cumuRssMem = long cumuRssMem =
ProcfsBasedProcessTree.PAGE_SIZE > 0 ProcfsBasedProcessTree.PAGE_SIZE > 0
? 1200L * ProcfsBasedProcessTree.PAGE_SIZE : 0L; ? 1200L * ProcfsBasedProcessTree.PAGE_SIZE : 0L;
Assert.assertEquals( Assert.assertEquals(
"Cumulative rssmem shouldn't have included new processes", "rssmem shouldn't have included new processes",
cumuRssMem, processTree.getCumulativeRssmem(1)); cumuRssMem, processTree.getRssMemorySize(1));
} else { } else {
Assert.assertEquals( Assert.assertEquals(
"Cumulative rssmem shouldn't have included new processes", "rssmem shouldn't have included new processes",
100 * KB_TO_BYTES * 4, processTree.getCumulativeRssmem(1)); 100 * KB_TO_BYTES * 4, processTree.getRssMemorySize(1));
} }
// no processes older than 3 iterations, this should be 0 // no processes older than 3 iterations
Assert.assertEquals( Assert.assertEquals(
"Getting non-zero vmem for processes older than 3 iterations", 0L, "Getting non-zero vmem for processes older than 3 iterations",
processTree.getCumulativeVmem(3)); UNAVAILABLE, processTree.getVirtualMemorySize(3));
Assert.assertEquals( Assert.assertEquals(
"Getting non-zero rssmem for processes older than 3 iterations", 0L, "Getting non-zero rssmem for processes older than 3 iterations",
processTree.getCumulativeRssmem(3)); UNAVAILABLE, processTree.getRssMemorySize(3));
Assert.assertEquals( Assert.assertEquals(
"Getting non-zero rssmem for processes older than 3 iterations", 0L, "Getting non-zero rssmem for processes older than 3 iterations",
processTree.getCumulativeRssmem(3)); UNAVAILABLE, processTree.getRssMemorySize(3));
} finally { } finally {
FileUtil.fullyDelete(procfsRootDir); FileUtil.fullyDelete(procfsRootDir);
} }

View File

@ -41,11 +41,11 @@ public class TestResourceCalculatorProcessTree {
return "Empty tree for testing"; return "Empty tree for testing";
} }
public long getCumulativeRssmem(int age) { public long getRssMemorySize(int age) {
return 0; return 0;
} }
public long getCumulativeVmem(int age) { public long getVirtualMemorySize(int age) {
return 0; return 0;
} }

View File

@ -53,26 +53,26 @@ public class TestWindowsBasedProcessTree {
WindowsBasedProcessTreeTester pTree = new WindowsBasedProcessTreeTester("-1"); WindowsBasedProcessTreeTester pTree = new WindowsBasedProcessTreeTester("-1");
pTree.infoStr = "3524,1024,1024,500\r\n2844,1024,1024,500\r\n"; pTree.infoStr = "3524,1024,1024,500\r\n2844,1024,1024,500\r\n";
pTree.updateProcessTree(); pTree.updateProcessTree();
assertTrue(pTree.getCumulativeVmem() == 2048); assertTrue(pTree.getVirtualMemorySize() == 2048);
assertTrue(pTree.getCumulativeVmem(0) == 2048); assertTrue(pTree.getVirtualMemorySize(0) == 2048);
assertTrue(pTree.getCumulativeRssmem() == 2048); assertTrue(pTree.getRssMemorySize() == 2048);
assertTrue(pTree.getCumulativeRssmem(0) == 2048); assertTrue(pTree.getRssMemorySize(0) == 2048);
assertTrue(pTree.getCumulativeCpuTime() == 1000); assertTrue(pTree.getCumulativeCpuTime() == 1000);
pTree.infoStr = "3524,1024,1024,1000\r\n2844,1024,1024,1000\r\n1234,1024,1024,1000\r\n"; pTree.infoStr = "3524,1024,1024,1000\r\n2844,1024,1024,1000\r\n1234,1024,1024,1000\r\n";
pTree.updateProcessTree(); pTree.updateProcessTree();
assertTrue(pTree.getCumulativeVmem() == 3072); assertTrue(pTree.getVirtualMemorySize() == 3072);
assertTrue(pTree.getCumulativeVmem(1) == 2048); assertTrue(pTree.getVirtualMemorySize(1) == 2048);
assertTrue(pTree.getCumulativeRssmem() == 3072); assertTrue(pTree.getRssMemorySize() == 3072);
assertTrue(pTree.getCumulativeRssmem(1) == 2048); assertTrue(pTree.getRssMemorySize(1) == 2048);
assertTrue(pTree.getCumulativeCpuTime() == 3000); assertTrue(pTree.getCumulativeCpuTime() == 3000);
pTree.infoStr = "3524,1024,1024,1500\r\n2844,1024,1024,1500\r\n"; pTree.infoStr = "3524,1024,1024,1500\r\n2844,1024,1024,1500\r\n";
pTree.updateProcessTree(); pTree.updateProcessTree();
assertTrue(pTree.getCumulativeVmem() == 2048); assertTrue(pTree.getVirtualMemorySize() == 2048);
assertTrue(pTree.getCumulativeVmem(2) == 2048); assertTrue(pTree.getVirtualMemorySize(2) == 2048);
assertTrue(pTree.getCumulativeRssmem() == 2048); assertTrue(pTree.getRssMemorySize() == 2048);
assertTrue(pTree.getCumulativeRssmem(2) == 2048); assertTrue(pTree.getRssMemorySize(2) == 2048);
assertTrue(pTree.getCumulativeCpuTime() == 4000); assertTrue(pTree.getCumulativeCpuTime() == 4000);
} }
} }

View File

@ -188,13 +188,19 @@ public class ContainerMetrics implements MetricsSource {
} }
public void recordMemoryUsage(int memoryMBs) { public void recordMemoryUsage(int memoryMBs) {
this.pMemMBsStat.add(memoryMBs); if (memoryMBs >= 0) {
this.pMemMBsStat.add(memoryMBs);
}
} }
public void recordCpuUsage( public void recordCpuUsage(
int totalPhysicalCpuPercent, int milliVcoresUsed) { int totalPhysicalCpuPercent, int milliVcoresUsed) {
this.cpuCoreUsagePercent.add(totalPhysicalCpuPercent); if (totalPhysicalCpuPercent >=0) {
this.milliVcoresUsed.add(milliVcoresUsed); this.cpuCoreUsagePercent.add(totalPhysicalCpuPercent);
}
if (milliVcoresUsed >= 0) {
this.milliVcoresUsed.add(milliVcoresUsed);
}
} }
public void recordProcessId(String processId) { public void recordProcessId(String processId) {

View File

@ -333,10 +333,10 @@ public class ContainersMonitorImpl extends AbstractService implements
// method provided just for easy testing purposes // method provided just for easy testing purposes
boolean isProcessTreeOverLimit(ResourceCalculatorProcessTree pTree, boolean isProcessTreeOverLimit(ResourceCalculatorProcessTree pTree,
String containerId, long limit) { String containerId, long limit) {
long currentMemUsage = pTree.getCumulativeVmem(); long currentMemUsage = pTree.getVirtualMemorySize();
// as processes begin with an age 1, we want to see if there are processes // as processes begin with an age 1, we want to see if there are processes
// more than 1 iteration old. // more than 1 iteration old.
long curMemUsageOfAgedProcesses = pTree.getCumulativeVmem(1); long curMemUsageOfAgedProcesses = pTree.getVirtualMemorySize(1);
return isProcessTreeOverLimit(containerId, currentMemUsage, return isProcessTreeOverLimit(containerId, currentMemUsage,
curMemUsageOfAgedProcesses, limit); curMemUsageOfAgedProcesses, limit);
} }
@ -437,8 +437,8 @@ public class ContainersMonitorImpl extends AbstractService implements
+ " ContainerId = " + containerId); + " ContainerId = " + containerId);
ResourceCalculatorProcessTree pTree = ptInfo.getProcessTree(); ResourceCalculatorProcessTree pTree = ptInfo.getProcessTree();
pTree.updateProcessTree(); // update process-tree pTree.updateProcessTree(); // update process-tree
long currentVmemUsage = pTree.getCumulativeVmem(); long currentVmemUsage = pTree.getVirtualMemorySize();
long currentPmemUsage = pTree.getCumulativeRssmem(); long currentPmemUsage = pTree.getRssMemorySize();
// if machine has 6 cores and 3 are used, // if machine has 6 cores and 3 are used,
// cpuUsagePercentPerCore should be 300% and // cpuUsagePercentPerCore should be 300% and
// cpuUsageTotalCoresPercentage should be 50% // cpuUsageTotalCoresPercentage should be 50%
@ -451,8 +451,8 @@ public class ContainersMonitorImpl extends AbstractService implements
* maxVCoresAllottedForContainers /nodeCpuPercentageForYARN); * maxVCoresAllottedForContainers /nodeCpuPercentageForYARN);
// as processes begin with an age 1, we want to see if there // as processes begin with an age 1, we want to see if there
// are processes more than 1 iteration old. // are processes more than 1 iteration old.
long curMemUsageOfAgedProcesses = pTree.getCumulativeVmem(1); long curMemUsageOfAgedProcesses = pTree.getVirtualMemorySize(1);
long curRssMemUsageOfAgedProcesses = pTree.getCumulativeRssmem(1); long curRssMemUsageOfAgedProcesses = pTree.getRssMemorySize(1);
long vmemLimit = ptInfo.getVmemLimit(); long vmemLimit = ptInfo.getVmemLimit();
long pmemLimit = ptInfo.getPmemLimit(); long pmemLimit = ptInfo.getPmemLimit();
LOG.info(String.format( LOG.info(String.format(