YARN-223. Update process tree instead of getting new process trees. (Radim Kolar via llu)

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1424244 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Luke Lu 2012-12-20 00:20:53 +00:00
parent aa30056d09
commit 1a49c85438
13 changed files with 56 additions and 123 deletions

View File

@ -61,8 +61,7 @@
import org.apache.hadoop.mapreduce.MRJobConfig;
import org.apache.hadoop.mapreduce.lib.reduce.WrappedReducer;
import org.apache.hadoop.mapreduce.task.ReduceContextImpl;
import org.apache.hadoop.yarn.util.ResourceCalculatorPlugin;
import org.apache.hadoop.yarn.util.ResourceCalculatorPlugin.*;
import org.apache.hadoop.yarn.util.ResourceCalculatorProcessTree;
import org.apache.hadoop.net.NetUtils;
import org.apache.hadoop.util.Progress;
import org.apache.hadoop.util.Progressable;
@ -169,7 +168,7 @@ static synchronized String getOutputName(int partition) {
private Iterator<Long> currentRecIndexIterator =
skipRanges.skipRangeIterator();
private ResourceCalculatorPlugin resourceCalculator = null;
private ResourceCalculatorProcessTree pTree;
private long initCpuCumulativeTime = 0;
protected JobConf conf;
@ -372,7 +371,7 @@ public void setSkipping(boolean skipping) {
* Return current state of the task.
* needs to be synchronized as communication thread
* sends the state every second
* @return
* @return task state
*/
synchronized TaskStatus.State getState(){
return this.taskStatus.getRunState();
@ -558,15 +557,15 @@ public void initialize(JobConf job, JobID id,
}
}
committer.setupTask(taskContext);
Class<? extends ResourceCalculatorPlugin> clazz =
conf.getClass(MRConfig.RESOURCE_CALCULATOR_PLUGIN,
null, ResourceCalculatorPlugin.class);
resourceCalculator = ResourceCalculatorPlugin
.getResourceCalculatorPlugin(clazz, conf);
LOG.info(" Using ResourceCalculatorPlugin : " + resourceCalculator);
if (resourceCalculator != null) {
initCpuCumulativeTime =
resourceCalculator.getProcResourceValues().getCumulativeCpuTime();
Class<? extends ResourceCalculatorProcessTree> clazz =
conf.getClass(MRConfig.RESOURCE_CALCULATOR_PROCESS_TREE,
null, ResourceCalculatorProcessTree.class);
pTree = ResourceCalculatorProcessTree
.getResourceCalculatorProcessTree(System.getenv().get("JVM_PID"), clazz, conf);
LOG.info(" Using ResourceCalculatorProcessTree : " + pTree);
if (pTree != null) {
pTree.updateProcessTree();
initCpuCumulativeTime = pTree.getCumulativeCpuTime();
}
}
@ -817,14 +816,14 @@ void updateResourceCounters() {
// Update generic resource counters
updateHeapUsageCounter();
// Updating resources specified in ResourceCalculatorPlugin
if (resourceCalculator == null) {
// Updating resources specified in ResourceCalculatorProcessTree
if (pTree == null) {
return;
}
ProcResourceValues res = resourceCalculator.getProcResourceValues();
long cpuTime = res.getCumulativeCpuTime();
long pMem = res.getPhysicalMemorySize();
long vMem = res.getVirtualMemorySize();
pTree.updateProcessTree();
long cpuTime = pTree.getCumulativeCpuTime();
long pMem = pTree.getCumulativeRssmem();
long vMem = pTree.getCumulativeVmem();
// Remove the CPU time consumed previously by JVM reuse
cpuTime -= initCpuCumulativeTime;
counters.findCounter(TaskCounter.CPU_MILLISECONDS).setValue(cpuTime);

View File

@ -55,8 +55,8 @@ public interface MRConfig {
public static final long DELEGATION_TOKEN_MAX_LIFETIME_DEFAULT =
7*24*60*60*1000; // 7 days
public static final String RESOURCE_CALCULATOR_PLUGIN =
"mapreduce.job.resourcecalculatorplugin";
public static final String RESOURCE_CALCULATOR_PROCESS_TREE =
"mapreduce.job.process-tree.class";
public static final String STATIC_RESOLUTIONS =
"mapreduce.job.net.static.resolutions";

View File

@ -409,7 +409,7 @@ public static void main(String[] args) {
@Override
public ProcResourceValues getProcResourceValues() {
pTree = pTree.getProcessTree();
pTree.updateProcessTree();
long cpuTime = pTree.getCumulativeCpuTime();
long pMem = pTree.getCumulativeRssmem();
long vMem = pTree.getCumulativeVmem();

View File

@ -166,12 +166,10 @@ public static boolean isAvailable() {
}
/**
* Get the process-tree with latest state. If the root-process is not alive,
* an empty tree will be returned.
*
* @return the process-tree with latest state.
* Update the process-tree with latest state. If the root-process is not alive,
* tree will become empty.
*/
public ProcfsBasedProcessTree getProcessTree() {
public void updateProcessTree() {
if (!pid.equals(deadPid)) {
// Get the list of processes
List<String> processList = getProcessList();
@ -197,7 +195,7 @@ public ProcfsBasedProcessTree getProcessTree() {
}
if (me == null) {
return this;
return;
}
// Add each process to its parent.
@ -239,7 +237,6 @@ public ProcfsBasedProcessTree getProcessTree() {
LOG.debug(this.toString());
}
}
return this;
}
/**

View File

@ -151,7 +151,7 @@ public void testProcessTree() {
ProcfsBasedProcessTree p = new ProcfsBasedProcessTree(pid,
ProcessTree.isSetsidAvailable,
ProcessTree.DEFAULT_SLEEPTIME_BEFORE_SIGKILL);
p = p.getProcessTree(); // initialize
p.updateProcessTree(); // initialize
LOG.info("ProcessTree: " + p.toString());
File leaf = new File(lowestDescendant);
@ -164,7 +164,7 @@ public void testProcessTree() {
}
}
p = p.getProcessTree(); // reconstruct
p.updateProcessTree(); // reconstruct
LOG.info("ProcessTree: " + p.toString());
// Get the process-tree dump
@ -203,7 +203,7 @@ public void testProcessTree() {
}
// ProcessTree is gone now. Any further calls should be sane.
p = p.getProcessTree();
p.updateProcessTree();
assertFalse("ProcessTree must have been gone", p.isAlive());
assertTrue("Cumulative vmem for the gone-process is "
+ p.getCumulativeVmem() + " . It should be zero.", p
@ -336,7 +336,7 @@ public void testCpuAndMemoryForProcessTree() throws IOException {
new ProcfsBasedProcessTree("100", true, 100L,
procfsRootDir.getAbsolutePath());
// build the process tree.
processTree.getProcessTree();
processTree.updateProcessTree();
// verify cumulative memory
assertEquals("Cumulative virtual memory does not match", 600000L,
@ -362,7 +362,7 @@ public void testCpuAndMemoryForProcessTree() throws IOException {
writeStatFiles(procfsRootDir, pids, procInfos);
// build the process tree.
processTree.getProcessTree();
processTree.updateProcessTree();
// verify cumulative cpu time again
cumuCpuTime = ProcfsBasedProcessTree.JIFFY_LENGTH_IN_MILLIS > 0 ?
@ -409,7 +409,7 @@ public void testMemForOlderProcesses() throws IOException {
new ProcfsBasedProcessTree("100", true, 100L,
procfsRootDir.getAbsolutePath());
// build the process tree.
processTree.getProcessTree();
processTree.updateProcessTree();
// verify cumulative memory
assertEquals("Cumulative memory does not match",
@ -425,7 +425,7 @@ public void testMemForOlderProcesses() throws IOException {
writeStatFiles(procfsRootDir, newPids, newProcInfos);
// check memory includes the new process.
processTree.getProcessTree();
processTree.updateProcessTree();
assertEquals("Cumulative vmem does not include new process",
1200000L, processTree.getCumulativeVmem());
long cumuRssMem = ProcfsBasedProcessTree.PAGE_SIZE > 0 ?
@ -451,7 +451,7 @@ public void testMemForOlderProcesses() throws IOException {
writeStatFiles(procfsRootDir, newPids, newProcInfos);
// refresh process tree
processTree.getProcessTree();
processTree.updateProcessTree();
// processes older than 2 iterations should be same as before.
assertEquals("Cumulative vmem shouldn't have included new processes",
@ -555,7 +555,7 @@ public void testProcessTreeDump()
new ProcfsBasedProcessTree("100", true, 100L, procfsRootDir
.getAbsolutePath());
// build the process tree.
processTree.getProcessTree();
processTree.updateProcessTree();
// Get the process-tree dump
String processTreeDump = processTree.getProcessTreeDump();

View File

@ -91,7 +91,6 @@ public class LinuxResourceCalculatorPlugin extends ResourceCalculatorPlugin {
private float cpuUsage = UNAVAILABLE;
private long sampleTime = UNAVAILABLE;
private long lastSampleTime = UNAVAILABLE;
private ResourceCalculatorProcessTree pTree = null;
boolean readMemInfoFile = false;
boolean readCpuInfoFile = false;
@ -109,8 +108,6 @@ public LinuxResourceCalculatorPlugin() {
procfsCpuFile = PROCFS_CPUINFO;
procfsStatFile = PROCFS_STAT;
jiffyLengthInMillis = ProcfsBasedProcessTree.JIFFY_LENGTH_IN_MILLIS;
String pid = System.getenv().get("JVM_PID");
pTree = new ProcfsBasedProcessTree(pid);
}
/**
@ -129,8 +126,6 @@ public LinuxResourceCalculatorPlugin(String procfsMemFile,
this.procfsCpuFile = procfsCpuFile;
this.procfsStatFile = procfsStatFile;
this.jiffyLengthInMillis = jiffyLengthInMillis;
String pid = System.getenv().get("JVM_PID");
pTree = new ProcfsBasedProcessTree(pid);
}
/**
@ -400,13 +395,4 @@ public static void main(String[] args) {
}
System.out.println("CPU usage % : " + plugin.getCpuUsage());
}
@Override
public ProcResourceValues getProcResourceValues() {
pTree = pTree.getProcessTree();
long cpuTime = pTree.getCumulativeCpuTime();
long pMem = pTree.getCumulativeRssmem();
long vMem = pTree.getCumulativeVmem();
return new ProcResourceValues(cpuTime, pMem, vMem);
}
}

View File

@ -140,13 +140,12 @@ public static boolean isAvailable() {
}
/**
* Get the process-tree with latest state. If the root-process is not alive,
* an empty tree will be returned.
* Update process-tree with latest state. If the root-process is not alive,
* tree will be empty.
*
* @return the process-tree with latest state.
*/
@Override
public ResourceCalculatorProcessTree getProcessTree() {
public void updateProcessTree() {
if (!pid.equals(deadPid)) {
// Get the list of processes
List<String> processList = getProcessList();
@ -172,7 +171,7 @@ public ResourceCalculatorProcessTree getProcessTree() {
}
if (me == null) {
return this;
return;
}
// Add each process to its parent.
@ -214,7 +213,6 @@ public ResourceCalculatorProcessTree getProcessTree() {
LOG.debug(this.toString());
}
}
return this;
}
/** Verify that the given process id is same as its process group id.

View File

@ -90,48 +90,6 @@ public abstract class ResourceCalculatorPlugin extends Configured {
*/
public abstract float getCpuUsage();
/**
* Obtain resource status used by current process tree.
*/
@InterfaceAudience.Private
@InterfaceStability.Unstable
public abstract ProcResourceValues getProcResourceValues();
public static class ProcResourceValues {
private final long cumulativeCpuTime;
private final long physicalMemorySize;
private final long virtualMemorySize;
public ProcResourceValues(long cumulativeCpuTime, long physicalMemorySize,
long virtualMemorySize) {
this.cumulativeCpuTime = cumulativeCpuTime;
this.physicalMemorySize = physicalMemorySize;
this.virtualMemorySize = virtualMemorySize;
}
/**
* Obtain the physical memory size used by current process tree.
* @return physical memory size in bytes.
*/
public long getPhysicalMemorySize() {
return physicalMemorySize;
}
/**
* Obtain the virtual memory size used by a current process tree.
* @return virtual memory size in bytes.
*/
public long getVirtualMemorySize() {
return virtualMemorySize;
}
/**
* Obtain the cumulative CPU time used by a current process tree.
* @return cumulative CPU time in milliseconds
*/
public long getCumulativeCpuTime() {
return cumulativeCpuTime;
}
}
/**
* Create the ResourceCalculatorPlugin from the class name and configure it. If
* class name is null, this method will try and return a memory calculator

View File

@ -43,16 +43,14 @@ public ResourceCalculatorProcessTree(String root) {
}
/**
* Get the process-tree with latest state. If the root-process is not alive,
* an empty tree will be returned.
* Update the process-tree with latest state.
*
* Each call to this function should increment the age of the running
* processes that already exist in the process tree. Age is used other API's
* of the interface.
*
* @return the process-tree with latest state.
*/
public abstract ResourceCalculatorProcessTree getProcessTree();
public abstract void updateProcessTree();
/**
* Get a dump of the process-tree.

View File

@ -161,7 +161,7 @@ public void testProcessTree() throws Exception {
String pid = getRogueTaskPID();
LOG.info("Root process pid: " + pid);
ProcfsBasedProcessTree p = createProcessTree(pid);
p.getProcessTree(); // initialize
p.updateProcessTree(); // initialize
LOG.info("ProcessTree: " + p.toString());
File leaf = new File(lowestDescendant);
@ -174,7 +174,7 @@ public void testProcessTree() throws Exception {
}
}
p.getProcessTree(); // reconstruct
p.updateProcessTree(); // reconstruct
LOG.info("ProcessTree: " + p.toString());
// Get the process-tree dump
@ -213,7 +213,7 @@ public void testProcessTree() throws Exception {
}
// ProcessTree is gone now. Any further calls should be sane.
p.getProcessTree();
p.updateProcessTree();
Assert.assertFalse("ProcessTree must have been gone", isAlive(pid));
Assert.assertTrue("Cumulative vmem for the gone-process is "
+ p.getCumulativeVmem() + " . It should be zero.", p
@ -358,7 +358,7 @@ public void testCpuAndMemoryForProcessTree() throws IOException {
ProcfsBasedProcessTree processTree =
createProcessTree("100", procfsRootDir.getAbsolutePath());
// build the process tree.
processTree.getProcessTree();
processTree.updateProcessTree();
// verify cumulative memory
Assert.assertEquals("Cumulative virtual memory does not match", 600000L,
@ -384,7 +384,7 @@ public void testCpuAndMemoryForProcessTree() throws IOException {
writeStatFiles(procfsRootDir, pids, procInfos);
// build the process tree.
processTree.getProcessTree();
processTree.updateProcessTree();
// verify cumulative cpu time again
cumuCpuTime = ProcfsBasedProcessTree.JIFFY_LENGTH_IN_MILLIS > 0 ?
@ -431,7 +431,7 @@ public void testMemForOlderProcesses() throws IOException {
ProcfsBasedProcessTree processTree =
createProcessTree("100", procfsRootDir.getAbsolutePath());
// build the process tree.
processTree.getProcessTree();
processTree.updateProcessTree();
// verify cumulative memory
Assert.assertEquals("Cumulative memory does not match",
@ -447,7 +447,7 @@ public void testMemForOlderProcesses() throws IOException {
writeStatFiles(procfsRootDir, newPids, newProcInfos);
// check memory includes the new process.
processTree.getProcessTree();
processTree.updateProcessTree();
Assert.assertEquals("Cumulative vmem does not include new process",
1200000L, processTree.getCumulativeVmem());
long cumuRssMem = ProcfsBasedProcessTree.PAGE_SIZE > 0 ?
@ -473,7 +473,7 @@ public void testMemForOlderProcesses() throws IOException {
writeStatFiles(procfsRootDir, newPids, newProcInfos);
// refresh process tree
processTree.getProcessTree();
processTree.updateProcessTree();
// processes older than 2 iterations should be same as before.
Assert.assertEquals("Cumulative vmem shouldn't have included new processes",
@ -577,7 +577,7 @@ public void testProcessTreeDump()
ProcfsBasedProcessTree processTree = createProcessTree(
"100", procfsRootDir.getAbsolutePath());
// build the process tree.
processTree.getProcessTree();
processTree.updateProcessTree();
// Get the process-tree dump
String processTreeDump = processTree.getProcessTreeDump();

View File

@ -34,8 +34,7 @@ public EmptyProcessTree(String pid) {
super(pid);
}
public ResourceCalculatorProcessTree getProcessTree() {
return this;
public void updateProcessTree() {
}
public String getProcessTreeDump() {

View File

@ -396,9 +396,7 @@ public void run() {
LOG.debug("Constructing ProcessTree for : PID = " + pId
+ " ContainerId = " + containerId);
ResourceCalculatorProcessTree pTree = ptInfo.getProcessTree();
pTree = pTree.getProcessTree(); // get the updated process-tree
ptInfo.setProcessTree(pTree); // update ptInfo with proces-tree of
// updated state
pTree.updateProcessTree(); // update process-tree
long currentVmemUsage = pTree.getCumulativeVmem();
long currentPmemUsage = pTree.getCumulativeRssmem();
// as processes begin with an age 1, we want to see if there

View File

@ -134,7 +134,7 @@ public void testProcessTreeLimits() throws IOException {
ProcfsBasedProcessTree pTree = new ProcfsBasedProcessTree(
"100",
procfsRootDir.getAbsolutePath());
pTree.getProcessTree();
pTree.updateProcessTree();
assertTrue("tree rooted at 100 should be over limit " +
"after first iteration.",
test.isProcessTreeOverLimit(pTree, "dummyId", limit));
@ -142,13 +142,13 @@ public void testProcessTreeLimits() throws IOException {
// the tree rooted at 200 is initially below limit.
pTree = new ProcfsBasedProcessTree("200",
procfsRootDir.getAbsolutePath());
pTree.getProcessTree();
pTree.updateProcessTree();
assertFalse("tree rooted at 200 shouldn't be over limit " +
"after one iteration.",
test.isProcessTreeOverLimit(pTree, "dummyId", limit));
// second iteration - now the tree has been over limit twice,
// hence it should be declared over limit.
pTree.getProcessTree();
pTree.updateProcessTree();
assertTrue(
"tree rooted at 200 should be over limit after 2 iterations",
test.isProcessTreeOverLimit(pTree, "dummyId", limit));
@ -156,12 +156,12 @@ public void testProcessTreeLimits() throws IOException {
// the tree rooted at 600 is never over limit.
pTree = new ProcfsBasedProcessTree("600",
procfsRootDir.getAbsolutePath());
pTree.getProcessTree();
pTree.updateProcessTree();
assertFalse("tree rooted at 600 should never be over limit.",
test.isProcessTreeOverLimit(pTree, "dummyId", limit));
// another iteration does not make any difference.
pTree.getProcessTree();
pTree.updateProcessTree();
assertFalse("tree rooted at 600 should never be over limit.",
test.isProcessTreeOverLimit(pTree, "dummyId", limit));
} finally {