YARN-3304. Addendum patch. Cleaning up ResourceCalculatorProcessTree APIs for public use and removing inconsistencies in the default values. (Junping Du and Karthik Kambatla via vinodkv)

(cherry picked from commit 7610925e90)
This commit is contained in:
Vinod Kumar Vavilapalli 2015-03-31 17:27:46 -07:00
parent b85bbca745
commit c980e34bc5
6 changed files with 236 additions and 46 deletions

View File

@ -344,16 +344,24 @@ public class ProcfsBasedProcessTree extends ResourceCalculatorProcessTree {
public long getVirtualMemorySize(int olderThanAge) {
long total = UNAVAILABLE;
for (ProcessInfo p : processTree.values()) {
if ((p != null) && (p.getAge() > olderThanAge)) {
if (p != null) {
if (total == UNAVAILABLE ) {
total = 0;
}
if (p.getAge() > olderThanAge) {
total += p.getVmem();
}
}
}
return total;
}
@Override
@SuppressWarnings("deprecation")
public long getCumulativeVmem(int olderThanAge) {
return getVirtualMemorySize(olderThanAge);
}
@Override
public long getRssMemorySize(int olderThanAge) {
if (PAGE_SIZE < 0) {
@ -365,14 +373,22 @@ public class ProcfsBasedProcessTree extends ResourceCalculatorProcessTree {
boolean isAvailable = false;
long totalPages = 0;
for (ProcessInfo p : processTree.values()) {
if ((p != null) && (p.getAge() > olderThanAge)) {
if ((p != null) ) {
if (p.getAge() > olderThanAge) {
totalPages += p.getRssmemPage();
}
isAvailable = true;
}
}
return isAvailable ? totalPages * PAGE_SIZE : UNAVAILABLE; // convert # pages to byte
}
@Override
@SuppressWarnings("deprecation")
public long getCumulativeRssmem(int olderThanAge) {
return getRssMemorySize(olderThanAge);
}
/**
* Get the resident set size (RSS) memory used by all the processes
* in the process-tree that are older than the passed in age. RSS is
@ -388,7 +404,12 @@ public class ProcfsBasedProcessTree extends ResourceCalculatorProcessTree {
private long getSmapBasedRssMemorySize(int olderThanAge) {
long total = UNAVAILABLE;
for (ProcessInfo p : processTree.values()) {
if ((p != null) && (p.getAge() > olderThanAge)) {
if (p != null) {
// set resource to 0 instead of UNAVAILABLE
if (total == UNAVAILABLE){
total = 0;
}
if (p.getAge() > olderThanAge) {
ProcessTreeSmapMemInfo procMemInfo = processSMAPTree.get(p.getPid());
if (procMemInfo != null) {
for (ProcessSmapMemoryInfo info : procMemInfo.getMemoryInfoList()) {
@ -399,9 +420,7 @@ public class ProcfsBasedProcessTree extends ResourceCalculatorProcessTree {
.equalsIgnoreCase(READ_EXECUTE_WITH_SHARED_PERMISSION)) {
continue;
}
if (total == UNAVAILABLE){
total = 0;
}
total +=
Math.min(info.sharedDirty, info.pss) + info.privateDirty
+ info.privateClean;
@ -414,11 +433,14 @@ public class ProcfsBasedProcessTree extends ResourceCalculatorProcessTree {
}
}
}
if (LOG.isDebugEnabled()) {
LOG.debug(procMemInfo.toString());
}
}
}
}
if (total > 0) {
total *= KB_TO_BYTES; // convert to bytes
}

View File

@ -79,6 +79,18 @@ public abstract class ResourceCalculatorProcessTree extends Configured {
return getVirtualMemorySize(0);
}
/**
* Get the virtual memory used by all the processes in the
* process-tree.
*
* @return virtual memory used by the process-tree in bytes,
* {@link #UNAVAILABLE} if it cannot be calculated.
*/
@Deprecated
public long getCumulativeVmem() {
return getCumulativeVmem(0);
}
/**
* Get the resident set size (rss) memory used by all the processes
* in the process-tree.
@ -90,6 +102,18 @@ public abstract class ResourceCalculatorProcessTree extends Configured {
return getRssMemorySize(0);
}
/**
* Get the resident set size (rss) memory used by all the processes
* in the process-tree.
*
* @return rss memory used by the process-tree in bytes,
* {@link #UNAVAILABLE} if it cannot be calculated.
*/
@Deprecated
public long getCumulativeRssmem() {
return getCumulativeRssmem(0);
}
/**
* Get the virtual memory used by all the processes in the
* process-tree that are older than the passed in age.
@ -104,6 +128,21 @@ public abstract class ResourceCalculatorProcessTree extends Configured {
return UNAVAILABLE;
}
/**
* Get the virtual memory used by all the processes in the
* process-tree that are older than the passed in age.
*
* @param olderThanAge processes above this age are included in the
* memory addition
* @return virtual memory used by the process-tree in bytes for
* processes older than the specified age, {@link #UNAVAILABLE} if it
* cannot be calculated.
*/
@Deprecated
public long getCumulativeVmem(int olderThanAge) {
return UNAVAILABLE;
}
/**
* Get the resident set size (rss) memory used by all the processes
* in the process-tree that are older than the passed in age.
@ -118,6 +157,21 @@ public abstract class ResourceCalculatorProcessTree extends Configured {
return UNAVAILABLE;
}
/**
* Get the resident set size (rss) memory used by all the processes
* in the process-tree that are older than the passed in age.
*
* @param olderThanAge processes above this age are included in the
* memory addition
* @return rss memory used by the process-tree in bytes for
* processes older than specified age, {@link #UNAVAILABLE} if it cannot be
* calculated.
*/
@Deprecated
public long getCumulativeRssmem(int olderThanAge) {
return UNAVAILABLE;
}
/**
* Get the CPU time in millisecond used by all the processes in the
* process-tree since the process-tree was created
@ -158,7 +212,6 @@ public abstract class ResourceCalculatorProcessTree extends Configured {
* @return ResourceCalculatorProcessTree or null if ResourceCalculatorPluginTree
* is not available for this system.
*/
@Private
public static ResourceCalculatorProcessTree getResourceCalculatorProcessTree(
String pid, Class<? extends ResourceCalculatorProcessTree> clazz, Configuration conf) {

View File

@ -176,30 +176,46 @@ public class WindowsBasedProcessTree extends ResourceCalculatorProcessTree {
public long getVirtualMemorySize(int olderThanAge) {
long total = UNAVAILABLE;
for (ProcessInfo p : processTree.values()) {
if ((p != null) && (p.age > olderThanAge)) {
if (p != null) {
if (total == UNAVAILABLE) {
total = 0;
}
if (p.age > olderThanAge) {
total += p.vmem;
}
}
}
return total;
}
@Override
@SuppressWarnings("deprecation")
public long getCumulativeVmem(int olderThanAge) {
return getVirtualMemorySize(olderThanAge);
}
@Override
public long getRssMemorySize(int olderThanAge) {
long total = UNAVAILABLE;
for (ProcessInfo p : processTree.values()) {
if ((p != null) && (p.age > olderThanAge)) {
if (p != null) {
if (total == UNAVAILABLE) {
total = 0;
}
if (p.age > olderThanAge) {
total += p.workingSet;
}
}
}
return total;
}
@Override
@SuppressWarnings("deprecation")
public long getCumulativeRssmem(int olderThanAge) {
return getRssMemorySize(olderThanAge);
}
@Override
public long getCumulativeCpuTime() {
for (ProcessInfo p : processTree.values()) {

View File

@ -117,6 +117,7 @@ public class TestProcfsBasedProcessTree {
}
@Test(timeout = 30000)
@SuppressWarnings("deprecation")
public void testProcessTree() throws Exception {
try {
Assert.assertTrue(ProcfsBasedProcessTree.isAvailable());
@ -226,9 +227,13 @@ public class TestProcfsBasedProcessTree {
// ProcessTree is gone now. Any further calls should be sane.
p.updateProcessTree();
Assert.assertFalse("ProcessTree must have been gone", isAlive(pid));
Assert.assertTrue(
"vmem for the gone-process is " + p.getVirtualMemorySize()
+ " . It should be zero.", p.getVirtualMemorySize() == 0);
Assert.assertTrue(
"vmem (old API) for the gone-process is " + p.getCumulativeVmem()
+ " . It should be zero.", p.getCumulativeVmem() == 0);
Assert.assertTrue(p.toString().equals("[ ]"));
}
@ -385,6 +390,7 @@ public class TestProcfsBasedProcessTree {
* files.
*/
@Test(timeout = 30000)
@SuppressWarnings("deprecation")
public void testCpuAndMemoryForProcessTree() throws IOException {
// test processes
@ -437,9 +443,13 @@ public class TestProcfsBasedProcessTree {
// verify rss memory
long cumuRssMem =
ProcfsBasedProcessTree.PAGE_SIZE > 0
? 600L * ProcfsBasedProcessTree.PAGE_SIZE : 0L;
? 600L * ProcfsBasedProcessTree.PAGE_SIZE :
ResourceCalculatorProcessTree.UNAVAILABLE;
Assert.assertEquals("rss memory does not match", cumuRssMem,
processTree.getRssMemorySize());
// verify old API
Assert.assertEquals("rss memory (old API) does not match", cumuRssMem,
processTree.getCumulativeRssmem());
// verify cumulative cpu time
long cumuCpuTime =
@ -459,6 +469,9 @@ public class TestProcfsBasedProcessTree {
// r--s)
Assert.assertEquals("rss memory does not match",
(100 * KB_TO_BYTES * 3), processTree.getRssMemorySize());
// verify old API
Assert.assertEquals("rss memory (old API) does not match",
(100 * KB_TO_BYTES * 3), processTree.getCumulativeRssmem());
// test the cpu time again to see if it cumulates
procInfos[0] =
@ -524,6 +537,7 @@ public class TestProcfsBasedProcessTree {
testMemForOlderProcesses(true);
}
@SuppressWarnings("deprecation")
private void testMemForOlderProcesses(boolean smapEnabled) throws IOException {
// initial list of processes
String[] pids = { "100", "200", "300", "400" };
@ -565,8 +579,12 @@ public class TestProcfsBasedProcessTree {
setSmapsInProceTree(processTree, smapEnabled);
// verify virtual memory
Assert.assertEquals("Cumulative memory does not match", 700000L,
Assert.assertEquals("Virtual memory does not match", 700000L,
processTree.getVirtualMemorySize());
Assert.assertEquals("Virtual memory (old API) does not match", 700000L,
processTree.getCumulativeVmem());
// write one more process as child of 100.
String[] newPids = { "500" };
setupPidDirs(procfsRootDir, newPids);
@ -584,32 +602,58 @@ public class TestProcfsBasedProcessTree {
processTree.updateProcessTree();
Assert.assertEquals("vmem does not include new process",
1200000L, processTree.getVirtualMemorySize());
Assert.assertEquals("vmem (old API) does not include new process",
1200000L, processTree.getCumulativeVmem());
if (!smapEnabled) {
long cumuRssMem =
ProcfsBasedProcessTree.PAGE_SIZE > 0
? 1200L * ProcfsBasedProcessTree.PAGE_SIZE : 0L;
? 1200L * ProcfsBasedProcessTree.PAGE_SIZE :
ResourceCalculatorProcessTree.UNAVAILABLE;
Assert.assertEquals("rssmem does not include new process",
cumuRssMem, processTree.getRssMemorySize());
// verify old API
Assert.assertEquals("rssmem (old API) does not include new process",
cumuRssMem, processTree.getCumulativeRssmem());
} else {
Assert.assertEquals("rssmem does not include new process",
100 * KB_TO_BYTES * 4, processTree.getRssMemorySize());
// verify old API
Assert.assertEquals("rssmem (old API) does not include new process",
100 * KB_TO_BYTES * 4, processTree.getCumulativeRssmem());
}
// however processes older than 1 iteration will retain the older value
Assert.assertEquals(
"vmem shouldn't have included new process", 700000L,
processTree.getVirtualMemorySize(1));
// verify old API
Assert.assertEquals(
"vmem (old API) shouldn't have included new process", 700000L,
processTree.getCumulativeVmem(1));
if (!smapEnabled) {
long cumuRssMem =
ProcfsBasedProcessTree.PAGE_SIZE > 0
? 700L * ProcfsBasedProcessTree.PAGE_SIZE : 0L;
? 700L * ProcfsBasedProcessTree.PAGE_SIZE :
ResourceCalculatorProcessTree.UNAVAILABLE;
Assert.assertEquals(
"rssmem shouldn't have included new process", cumuRssMem,
processTree.getRssMemorySize(1));
// Verify old API
Assert.assertEquals(
"rssmem (old API) shouldn't have included new process", cumuRssMem,
processTree.getCumulativeRssmem(1));
} else {
Assert.assertEquals(
"rssmem shouldn't have included new process",
100 * KB_TO_BYTES * 3, processTree.getRssMemorySize(1));
// Verify old API
Assert.assertEquals(
"rssmem (old API) shouldn't have included new process",
100 * KB_TO_BYTES * 3, processTree.getCumulativeRssmem(1));
}
// one more process
@ -632,17 +676,32 @@ public class TestProcfsBasedProcessTree {
Assert.assertEquals(
"vmem shouldn't have included new processes", 700000L,
processTree.getVirtualMemorySize(2));
// verify old API
Assert.assertEquals(
"vmem (old API) shouldn't have included new processes", 700000L,
processTree.getCumulativeVmem(2));
if (!smapEnabled) {
long cumuRssMem =
ProcfsBasedProcessTree.PAGE_SIZE > 0
? 700L * ProcfsBasedProcessTree.PAGE_SIZE : 0L;
? 700L * ProcfsBasedProcessTree.PAGE_SIZE :
ResourceCalculatorProcessTree.UNAVAILABLE;
Assert.assertEquals(
"rssmem shouldn't have included new processes",
cumuRssMem, processTree.getRssMemorySize(2));
// Verify old API
Assert.assertEquals(
"rssmem (old API) shouldn't have included new processes",
cumuRssMem, processTree.getCumulativeRssmem(2));
} else {
Assert.assertEquals(
"rssmem shouldn't have included new processes",
100 * KB_TO_BYTES * 3, processTree.getRssMemorySize(2));
// Verify old API
Assert.assertEquals(
"rssmem (old API) shouldn't have included new processes",
100 * KB_TO_BYTES * 3, processTree.getCumulativeRssmem(2));
}
// processes older than 1 iteration should not include new process,
@ -650,29 +709,46 @@ public class TestProcfsBasedProcessTree {
Assert.assertEquals(
"vmem shouldn't have included new processes", 1200000L,
processTree.getVirtualMemorySize(1));
// verify old API
Assert.assertEquals(
"vmem (old API) shouldn't have included new processes", 1200000L,
processTree.getCumulativeVmem(1));
if (!smapEnabled) {
long cumuRssMem =
ProcfsBasedProcessTree.PAGE_SIZE > 0
? 1200L * ProcfsBasedProcessTree.PAGE_SIZE : 0L;
? 1200L * ProcfsBasedProcessTree.PAGE_SIZE :
ResourceCalculatorProcessTree.UNAVAILABLE;
Assert.assertEquals(
"rssmem shouldn't have included new processes",
cumuRssMem, processTree.getRssMemorySize(1));
// verify old API
Assert.assertEquals(
"rssmem (old API) shouldn't have included new processes",
cumuRssMem, processTree.getCumulativeRssmem(1));
} else {
Assert.assertEquals(
"rssmem shouldn't have included new processes",
100 * KB_TO_BYTES * 4, processTree.getRssMemorySize(1));
Assert.assertEquals(
"rssmem (old API) shouldn't have included new processes",
100 * KB_TO_BYTES * 4, processTree.getCumulativeRssmem(1));
}
// no processes older than 3 iterations
Assert.assertEquals(
"Getting non-zero vmem for processes older than 3 iterations",
UNAVAILABLE, processTree.getVirtualMemorySize(3));
0, processTree.getVirtualMemorySize(3));
// verify old API
Assert.assertEquals(
"Getting non-zero vmem (old API) for processes older than 3 iterations",
0, processTree.getCumulativeVmem(3));
Assert.assertEquals(
"Getting non-zero rssmem for processes older than 3 iterations",
UNAVAILABLE, processTree.getRssMemorySize(3));
0, processTree.getRssMemorySize(3));
// verify old API
Assert.assertEquals(
"Getting non-zero rssmem for processes older than 3 iterations",
UNAVAILABLE, processTree.getRssMemorySize(3));
"Getting non-zero rssmem (old API) for processes older than 3 iterations",
0, processTree.getCumulativeRssmem(3));
} finally {
FileUtil.fullyDelete(procfsRootDir);
}

View File

@ -45,10 +45,20 @@ public class TestResourceCalculatorProcessTree {
return 0;
}
@SuppressWarnings("deprecation")
public long getCumulativeRssmem(int age) {
return 0;
}
public long getVirtualMemorySize(int age) {
return 0;
}
@SuppressWarnings("deprecation")
public long getCumulativeVmem(int age) {
return 0;
}
public long getCumulativeCpuTime() {
return 0;
}

View File

@ -41,6 +41,7 @@ public class TestWindowsBasedProcessTree {
}
@Test (timeout = 30000)
@SuppressWarnings("deprecation")
public void tree() {
if( !Shell.WINDOWS) {
LOG.info("Platform not Windows. Not testing");
@ -49,30 +50,42 @@ public class TestWindowsBasedProcessTree {
assertTrue("WindowsBasedProcessTree should be available on Windows",
WindowsBasedProcessTree.isAvailable());
WindowsBasedProcessTreeTester pTree = new WindowsBasedProcessTreeTester("-1");
pTree.infoStr = "3524,1024,1024,500\r\n2844,1024,1024,500\r\n";
pTree.updateProcessTree();
assertTrue(pTree.getVirtualMemorySize() == 2048);
assertTrue(pTree.getCumulativeVmem() == 2048);
assertTrue(pTree.getVirtualMemorySize(0) == 2048);
assertTrue(pTree.getCumulativeVmem(0) == 2048);
assertTrue(pTree.getRssMemorySize() == 2048);
assertTrue(pTree.getCumulativeRssmem() == 2048);
assertTrue(pTree.getRssMemorySize(0) == 2048);
assertTrue(pTree.getCumulativeRssmem(0) == 2048);
assertTrue(pTree.getCumulativeCpuTime() == 1000);
pTree.infoStr = "3524,1024,1024,1000\r\n2844,1024,1024,1000\r\n1234,1024,1024,1000\r\n";
pTree.updateProcessTree();
assertTrue(pTree.getVirtualMemorySize() == 3072);
assertTrue(pTree.getCumulativeVmem() == 3072);
assertTrue(pTree.getVirtualMemorySize(1) == 2048);
assertTrue(pTree.getCumulativeVmem(1) == 2048);
assertTrue(pTree.getRssMemorySize() == 3072);
assertTrue(pTree.getCumulativeRssmem() == 3072);
assertTrue(pTree.getRssMemorySize(1) == 2048);
assertTrue(pTree.getCumulativeRssmem(1) == 2048);
assertTrue(pTree.getCumulativeCpuTime() == 3000);
pTree.infoStr = "3524,1024,1024,1500\r\n2844,1024,1024,1500\r\n";
pTree.updateProcessTree();
assertTrue(pTree.getVirtualMemorySize() == 2048);
assertTrue(pTree.getCumulativeVmem() == 2048);
assertTrue(pTree.getVirtualMemorySize(2) == 2048);
assertTrue(pTree.getCumulativeVmem(2) == 2048);
assertTrue(pTree.getRssMemorySize() == 2048);
assertTrue(pTree.getCumulativeRssmem() == 2048);
assertTrue(pTree.getRssMemorySize(2) == 2048);
assertTrue(pTree.getCumulativeRssmem(2) == 2048);
assertTrue(pTree.getCumulativeCpuTime() == 4000);
}
}