MAPREDUCE-5077. Remove mapreduce.util.ResourceCalculatorPlugin and related code. Contributed by Karthik Kambatla.

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/branch-2@1461254 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Siddharth Seth 2013-03-26 18:18:44 +00:00
parent 57f91f9541
commit 6e9e2f0c7f
16 changed files with 25 additions and 2416 deletions

View File

@ -90,6 +90,9 @@ Release 2.0.5-beta - UNRELEASED
MAPREDUCE-4087. [Gridmix] GenerateDistCacheData job of Gridmix can MAPREDUCE-4087. [Gridmix] GenerateDistCacheData job of Gridmix can
become slow in some cases (ravigummadi via tgraves). become slow in some cases (ravigummadi via tgraves).
MAPREDUCE-5077. Remove mapreduce.util.ResourceCalculatorPlugin and related
code. (Karthik Kambatla via sseth)
Release 2.0.4-alpha - UNRELEASED Release 2.0.4-alpha - UNRELEASED

View File

@ -1,418 +0,0 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.mapreduce.util;
import java.io.BufferedReader;
import java.io.FileInputStream;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.io.InputStreamReader;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import com.google.common.base.Charsets;
/**
* Plugin to calculate resource information on Linux systems.
*/
@InterfaceAudience.Private
@InterfaceStability.Unstable
public class LinuxResourceCalculatorPlugin extends ResourceCalculatorPlugin {
private static final Log LOG =
LogFactory.getLog(LinuxResourceCalculatorPlugin.class);
public static final int UNAVAILABLE = -1;
/**
* proc's meminfo virtual file has keys-values in the format
* "key:[ \t]*value[ \t]kB".
*/
private static final String PROCFS_MEMFILE = "/proc/meminfo";
private static final Pattern PROCFS_MEMFILE_FORMAT =
Pattern.compile("^([a-zA-Z]*):[ \t]*([0-9]*)[ \t]kB");
// We need the values for the following keys in meminfo
private static final String MEMTOTAL_STRING = "MemTotal";
private static final String SWAPTOTAL_STRING = "SwapTotal";
private static final String MEMFREE_STRING = "MemFree";
private static final String SWAPFREE_STRING = "SwapFree";
private static final String INACTIVE_STRING = "Inactive";
/**
* Patterns for parsing /proc/cpuinfo
*/
private static final String PROCFS_CPUINFO = "/proc/cpuinfo";
private static final Pattern PROCESSOR_FORMAT =
Pattern.compile("^processor[ \t]:[ \t]*([0-9]*)");
private static final Pattern FREQUENCY_FORMAT =
Pattern.compile("^cpu MHz[ \t]*:[ \t]*([0-9.]*)");
/**
* Pattern for parsing /proc/stat
*/
private static final String PROCFS_STAT = "/proc/stat";
private static final Pattern CPU_TIME_FORMAT =
Pattern.compile("^cpu[ \t]*([0-9]*)" +
"[ \t]*([0-9]*)[ \t]*([0-9]*)[ \t].*");
private String procfsMemFile;
private String procfsCpuFile;
private String procfsStatFile;
long jiffyLengthInMillis;
private long ramSize = 0;
private long swapSize = 0;
private long ramSizeFree = 0; // free ram space on the machine (kB)
private long swapSizeFree = 0; // free swap space on the machine (kB)
private long inactiveSize = 0; // inactive cache memory (kB)
private int numProcessors = 0; // number of processors on the system
private long cpuFrequency = 0L; // CPU frequency on the system (kHz)
private long cumulativeCpuTime = 0L; // CPU used time since system is on (ms)
private long lastCumulativeCpuTime = 0L; // CPU used time read last time (ms)
// Unix timestamp while reading the CPU time (ms)
private float cpuUsage = UNAVAILABLE;
private long sampleTime = UNAVAILABLE;
private long lastSampleTime = UNAVAILABLE;
private ProcfsBasedProcessTree pTree = null;
boolean readMemInfoFile = false;
boolean readCpuInfoFile = false;
/**
* Get current time
* @return Unix time stamp in millisecond
*/
long getCurrentTime() {
return System.currentTimeMillis();
}
public LinuxResourceCalculatorPlugin() {
procfsMemFile = PROCFS_MEMFILE;
procfsCpuFile = PROCFS_CPUINFO;
procfsStatFile = PROCFS_STAT;
jiffyLengthInMillis = ProcfsBasedProcessTree.JIFFY_LENGTH_IN_MILLIS;
String pid = System.getenv().get("JVM_PID");
pTree = new ProcfsBasedProcessTree(pid);
}
/**
* Constructor which allows assigning the /proc/ directories. This will be
* used only in unit tests
* @param procfsMemFile fake file for /proc/meminfo
* @param procfsCpuFile fake file for /proc/cpuinfo
* @param procfsStatFile fake file for /proc/stat
* @param jiffyLengthInMillis fake jiffy length value
*/
public LinuxResourceCalculatorPlugin(String procfsMemFile,
String procfsCpuFile,
String procfsStatFile,
long jiffyLengthInMillis) {
this.procfsMemFile = procfsMemFile;
this.procfsCpuFile = procfsCpuFile;
this.procfsStatFile = procfsStatFile;
this.jiffyLengthInMillis = jiffyLengthInMillis;
String pid = System.getenv().get("JVM_PID");
pTree = new ProcfsBasedProcessTree(pid);
}
/**
* Read /proc/meminfo, parse and compute memory information only once
*/
private void readProcMemInfoFile() {
readProcMemInfoFile(false);
}
/**
* Read /proc/meminfo, parse and compute memory information
* @param readAgain if false, read only on the first time
*/
private void readProcMemInfoFile(boolean readAgain) {
if (readMemInfoFile && !readAgain) {
return;
}
// Read "/proc/memInfo" file
BufferedReader in = null;
InputStreamReader fReader = null;
try {
fReader = new InputStreamReader(new FileInputStream(procfsMemFile),
Charsets.UTF_8);
in = new BufferedReader(fReader);
} catch (FileNotFoundException f) {
// shouldn't happen....
return;
}
Matcher mat = null;
try {
String str = in.readLine();
while (str != null) {
mat = PROCFS_MEMFILE_FORMAT.matcher(str);
if (mat.find()) {
if (mat.group(1).equals(MEMTOTAL_STRING)) {
ramSize = Long.parseLong(mat.group(2));
} else if (mat.group(1).equals(SWAPTOTAL_STRING)) {
swapSize = Long.parseLong(mat.group(2));
} else if (mat.group(1).equals(MEMFREE_STRING)) {
ramSizeFree = Long.parseLong(mat.group(2));
} else if (mat.group(1).equals(SWAPFREE_STRING)) {
swapSizeFree = Long.parseLong(mat.group(2));
} else if (mat.group(1).equals(INACTIVE_STRING)) {
inactiveSize = Long.parseLong(mat.group(2));
}
}
str = in.readLine();
}
} catch (IOException io) {
LOG.warn("Error reading the stream " + io);
} finally {
// Close the streams
try {
fReader.close();
try {
in.close();
} catch (IOException i) {
LOG.warn("Error closing the stream " + in);
}
} catch (IOException i) {
LOG.warn("Error closing the stream " + fReader);
}
}
readMemInfoFile = true;
}
/**
* Read /proc/cpuinfo, parse and calculate CPU information
*/
private void readProcCpuInfoFile() {
// This directory needs to be read only once
if (readCpuInfoFile) {
return;
}
// Read "/proc/cpuinfo" file
BufferedReader in = null;
InputStreamReader fReader = null;
try {
fReader = new InputStreamReader(new FileInputStream(procfsCpuFile),
Charsets.UTF_8);
in = new BufferedReader(fReader);
} catch (FileNotFoundException f) {
// shouldn't happen....
return;
}
Matcher mat = null;
try {
numProcessors = 0;
String str = in.readLine();
while (str != null) {
mat = PROCESSOR_FORMAT.matcher(str);
if (mat.find()) {
numProcessors++;
}
mat = FREQUENCY_FORMAT.matcher(str);
if (mat.find()) {
cpuFrequency = (long)(Double.parseDouble(mat.group(1)) * 1000); // kHz
}
str = in.readLine();
}
} catch (IOException io) {
LOG.warn("Error reading the stream " + io);
} finally {
// Close the streams
try {
fReader.close();
try {
in.close();
} catch (IOException i) {
LOG.warn("Error closing the stream " + in);
}
} catch (IOException i) {
LOG.warn("Error closing the stream " + fReader);
}
}
readCpuInfoFile = true;
}
/**
* Read /proc/stat file, parse and calculate cumulative CPU
*/
private void readProcStatFile() {
// Read "/proc/stat" file
BufferedReader in = null;
InputStreamReader fReader = null;
try {
fReader = new InputStreamReader(new FileInputStream(procfsStatFile),
Charsets.UTF_8);
in = new BufferedReader(fReader);
} catch (FileNotFoundException f) {
// shouldn't happen....
return;
}
Matcher mat = null;
try {
String str = in.readLine();
while (str != null) {
mat = CPU_TIME_FORMAT.matcher(str);
if (mat.find()) {
long uTime = Long.parseLong(mat.group(1));
long nTime = Long.parseLong(mat.group(2));
long sTime = Long.parseLong(mat.group(3));
cumulativeCpuTime = uTime + nTime + sTime; // milliseconds
break;
}
str = in.readLine();
}
cumulativeCpuTime *= jiffyLengthInMillis;
} catch (IOException io) {
LOG.warn("Error reading the stream " + io);
} finally {
// Close the streams
try {
fReader.close();
try {
in.close();
} catch (IOException i) {
LOG.warn("Error closing the stream " + in);
}
} catch (IOException i) {
LOG.warn("Error closing the stream " + fReader);
}
}
}
/** {@inheritDoc} */
@Override
public long getPhysicalMemorySize() {
readProcMemInfoFile();
return ramSize * 1024;
}
/** {@inheritDoc} */
@Override
public long getVirtualMemorySize() {
readProcMemInfoFile();
return (ramSize + swapSize) * 1024;
}
/** {@inheritDoc} */
@Override
public long getAvailablePhysicalMemorySize() {
readProcMemInfoFile(true);
return (ramSizeFree + inactiveSize) * 1024;
}
/** {@inheritDoc} */
@Override
public long getAvailableVirtualMemorySize() {
readProcMemInfoFile(true);
return (ramSizeFree + swapSizeFree + inactiveSize) * 1024;
}
/** {@inheritDoc} */
@Override
public int getNumProcessors() {
readProcCpuInfoFile();
return numProcessors;
}
/** {@inheritDoc} */
@Override
public long getCpuFrequency() {
readProcCpuInfoFile();
return cpuFrequency;
}
/** {@inheritDoc} */
@Override
public long getCumulativeCpuTime() {
readProcStatFile();
return cumulativeCpuTime;
}
/** {@inheritDoc} */
@Override
public float getCpuUsage() {
readProcStatFile();
sampleTime = getCurrentTime();
if (lastSampleTime == UNAVAILABLE ||
lastSampleTime > sampleTime) {
// lastSampleTime > sampleTime may happen when the system time is changed
lastSampleTime = sampleTime;
lastCumulativeCpuTime = cumulativeCpuTime;
return cpuUsage;
}
// When lastSampleTime is sufficiently old, update cpuUsage.
// Also take a sample of the current time and cumulative CPU time for the
// use of the next calculation.
final long MINIMUM_UPDATE_INTERVAL = 10 * jiffyLengthInMillis;
if (sampleTime > lastSampleTime + MINIMUM_UPDATE_INTERVAL) {
cpuUsage = (float)(cumulativeCpuTime - lastCumulativeCpuTime) * 100F /
((float)(sampleTime - lastSampleTime) * getNumProcessors());
lastSampleTime = sampleTime;
lastCumulativeCpuTime = cumulativeCpuTime;
}
return cpuUsage;
}
/**
* Test the {@link LinuxResourceCalculatorPlugin}
*
* @param args
*/
public static void main(String[] args) {
LinuxResourceCalculatorPlugin plugin = new LinuxResourceCalculatorPlugin();
System.out.println("Physical memory Size (bytes) : "
+ plugin.getPhysicalMemorySize());
System.out.println("Total Virtual memory Size (bytes) : "
+ plugin.getVirtualMemorySize());
System.out.println("Available Physical memory Size (bytes) : "
+ plugin.getAvailablePhysicalMemorySize());
System.out.println("Total Available Virtual memory Size (bytes) : "
+ plugin.getAvailableVirtualMemorySize());
System.out.println("Number of Processors : " + plugin.getNumProcessors());
System.out.println("CPU frequency (kHz) : " + plugin.getCpuFrequency());
System.out.println("Cumulative CPU time (ms) : " +
plugin.getCumulativeCpuTime());
try {
// Sleep so we can compute the CPU usage
Thread.sleep(500L);
} catch (InterruptedException e) {
// do nothing
}
System.out.println("CPU usage % : " + plugin.getCpuUsage());
}
@Override
public ProcResourceValues getProcResourceValues() {
pTree.updateProcessTree();
long cpuTime = pTree.getCumulativeCpuTime();
long pMem = pTree.getCumulativeRssmem();
long vMem = pTree.getCumulativeVmem();
return new ProcResourceValues(cpuTime, pMem, vMem);
}
}

View File

@ -1,743 +0,0 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.mapreduce.util;
import java.io.BufferedReader;
import java.io.File;
import java.io.FileInputStream;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.io.InputStreamReader;
import java.math.BigInteger;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.HashMap;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import java.util.LinkedList;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.util.Shell.ShellCommandExecutor;
import org.apache.hadoop.util.StringUtils;
import com.google.common.base.Charsets;
/**
* A Proc file-system based ProcessTree. Works only on Linux.
*/
@InterfaceAudience.Private
@InterfaceStability.Unstable
public class ProcfsBasedProcessTree extends ProcessTree {
static final Log LOG = LogFactory
.getLog(ProcfsBasedProcessTree.class);
private static final String PROCFS = "/proc/";
private static final Pattern PROCFS_STAT_FILE_FORMAT = Pattern .compile(
"^([0-9-]+)\\s([^\\s]+)\\s[^\\s]\\s([0-9-]+)\\s([0-9-]+)\\s([0-9-]+)\\s" +
"([0-9-]+\\s){7}([0-9]+)\\s([0-9]+)\\s([0-9-]+\\s){7}([0-9]+)\\s([0-9]+)" +
"(\\s[0-9-]+){15}");
static final String PROCFS_STAT_FILE = "stat";
static final String PROCFS_CMDLINE_FILE = "cmdline";
public static final long PAGE_SIZE;
static {
ShellCommandExecutor shellExecutor =
new ShellCommandExecutor(new String[]{"getconf", "PAGESIZE"});
long pageSize = -1;
try {
shellExecutor.execute();
pageSize = Long.parseLong(shellExecutor.getOutput().replace("\n", ""));
} catch (IOException e) {
LOG.error(StringUtils.stringifyException(e));
} finally {
PAGE_SIZE = pageSize;
}
}
public static final long JIFFY_LENGTH_IN_MILLIS; // in millisecond
static {
ShellCommandExecutor shellExecutor =
new ShellCommandExecutor(new String[]{"getconf", "CLK_TCK"});
long jiffiesPerSecond = -1;
try {
shellExecutor.execute();
jiffiesPerSecond = Long.parseLong(shellExecutor.getOutput().replace("\n", ""));
} catch (IOException e) {
LOG.error(StringUtils.stringifyException(e));
} finally {
JIFFY_LENGTH_IN_MILLIS = jiffiesPerSecond != -1 ?
Math.round(1000D / jiffiesPerSecond) : -1;
}
}
// to enable testing, using this variable which can be configured
// to a test directory.
private String procfsDir;
static private String deadPid = "-1";
private String pid = deadPid;
static private Pattern numberPattern = Pattern.compile("[1-9][0-9]*");
private Long cpuTime = 0L;
private boolean setsidUsed = false;
private long sleeptimeBeforeSigkill = DEFAULT_SLEEPTIME_BEFORE_SIGKILL;
private Map<String, ProcessInfo> processTree = new HashMap<String, ProcessInfo>();
public ProcfsBasedProcessTree(String pid) {
this(pid, false, DEFAULT_SLEEPTIME_BEFORE_SIGKILL);
}
public ProcfsBasedProcessTree(String pid, boolean setsidUsed,
long sigkillInterval) {
this(pid, setsidUsed, sigkillInterval, PROCFS);
}
/**
* Build a new process tree rooted at the pid.
*
* This method is provided mainly for testing purposes, where
* the root of the proc file system can be adjusted.
*
* @param pid root of the process tree
* @param setsidUsed true, if setsid was used for the root pid
* @param sigkillInterval how long to wait between a SIGTERM and SIGKILL
* when killing a process tree
* @param procfsDir the root of a proc file system - only used for testing.
*/
public ProcfsBasedProcessTree(String pid, boolean setsidUsed,
long sigkillInterval, String procfsDir) {
this.pid = getValidPID(pid);
this.setsidUsed = setsidUsed;
sleeptimeBeforeSigkill = sigkillInterval;
this.procfsDir = procfsDir;
}
/**
* Sets SIGKILL interval
* @deprecated Use {@link ProcfsBasedProcessTree#ProcfsBasedProcessTree(
* String, boolean, long)} instead
* @param interval The time to wait before sending SIGKILL
* after sending SIGTERM
*/
@Deprecated
public void setSigKillInterval(long interval) {
sleeptimeBeforeSigkill = interval;
}
/**
* Checks if the ProcfsBasedProcessTree is available on this system.
*
* @return true if ProcfsBasedProcessTree is available. False otherwise.
*/
public static boolean isAvailable() {
try {
String osName = System.getProperty("os.name");
if (!osName.startsWith("Linux")) {
LOG.info("ProcfsBasedProcessTree currently is supported only on "
+ "Linux.");
return false;
}
} catch (SecurityException se) {
LOG.warn("Failed to get Operating System name. " + se);
return false;
}
return true;
}
/**
* Update the process-tree with latest state. If the root-process is not alive,
* tree will become empty.
*/
public void updateProcessTree() {
if (!pid.equals(deadPid)) {
// Get the list of processes
List<String> processList = getProcessList();
Map<String, ProcessInfo> allProcessInfo = new HashMap<String, ProcessInfo>();
// cache the processTree to get the age for processes
Map<String, ProcessInfo> oldProcs =
new HashMap<String, ProcessInfo>(processTree);
processTree.clear();
ProcessInfo me = null;
for (String proc : processList) {
// Get information for each process
ProcessInfo pInfo = new ProcessInfo(proc);
if (constructProcessInfo(pInfo, procfsDir) != null) {
allProcessInfo.put(proc, pInfo);
if (proc.equals(this.pid)) {
me = pInfo; // cache 'me'
processTree.put(proc, pInfo);
}
}
}
if (me == null) {
return;
}
// Add each process to its parent.
for (Map.Entry<String, ProcessInfo> entry : allProcessInfo.entrySet()) {
String pID = entry.getKey();
if (!pID.equals("1")) {
ProcessInfo pInfo = entry.getValue();
ProcessInfo parentPInfo = allProcessInfo.get(pInfo.getPpid());
if (parentPInfo != null) {
parentPInfo.addChild(pInfo);
}
}
}
// now start constructing the process-tree
LinkedList<ProcessInfo> pInfoQueue = new LinkedList<ProcessInfo>();
pInfoQueue.addAll(me.getChildren());
while (!pInfoQueue.isEmpty()) {
ProcessInfo pInfo = pInfoQueue.remove();
if (!processTree.containsKey(pInfo.getPid())) {
processTree.put(pInfo.getPid(), pInfo);
}
pInfoQueue.addAll(pInfo.getChildren());
}
// update age values and compute the number of jiffies since last update
for (Map.Entry<String, ProcessInfo> procs : processTree.entrySet()) {
ProcessInfo oldInfo = oldProcs.get(procs.getKey());
if (procs.getValue() != null) {
procs.getValue().updateJiffy(oldInfo);
if (oldInfo != null) {
procs.getValue().updateAge(oldInfo);
}
}
}
if (LOG.isDebugEnabled()) {
// Log.debug the ProcfsBasedProcessTree
LOG.debug(this.toString());
}
}
}
/**
* Is the root-process alive?
*
* @return true if the root-process is alive, false otherwise.
*/
public boolean isAlive() {
if (pid.equals(deadPid)) {
return false;
} else {
return isAlive(pid);
}
}
/**
* Is any of the subprocesses in the process-tree alive?
*
* @return true if any of the processes in the process-tree is
* alive, false otherwise.
*/
public boolean isAnyProcessInTreeAlive() {
for (String pId : processTree.keySet()) {
if (isAlive(pId)) {
return true;
}
}
return false;
}
/** Verify that the given process id is same as its process group id.
* @param pidStr Process id of the to-be-verified-process
* @param procfsDir Procfs root dir
*/
static boolean checkPidPgrpidForMatch(String pidStr, String procfsDir) {
// Get information for this process
ProcessInfo pInfo = new ProcessInfo(pidStr);
pInfo = constructProcessInfo(pInfo, procfsDir);
if (pInfo == null) {
// process group leader may have finished execution, but we still need to
// kill the subProcesses in the process group.
return true;
}
String pgrpId = pInfo.getPgrpId().toString();
//make sure that pId and its pgrpId match
if (!pgrpId.equals(pidStr)) {
LOG.warn("Unexpected: Process with PID " + pidStr +
" is not a process group leader. pgrpId is: " + pInfo.getPgrpId());
return false;
}
if (LOG.isDebugEnabled()) {
LOG.debug(pidStr + " is a process group leader, as expected.");
}
return true;
}
/** Make sure that the given pid is a process group leader and then
* destroy the process group.
* @param pgrpId Process group id of to-be-killed-processes
* @param interval The time to wait before sending SIGKILL
* after sending SIGTERM
* @param inBackground Process is to be killed in the back ground with
* a separate thread
*/
public static void assertAndDestroyProcessGroup(String pgrpId, long interval,
boolean inBackground)
throws IOException {
// Make sure that the pid given is a process group leader
if (!checkPidPgrpidForMatch(pgrpId, PROCFS)) {
throw new IOException("Process with PID " + pgrpId +
" is not a process group leader.");
}
destroyProcessGroup(pgrpId, interval, inBackground);
}
/**
* Destroy the process-tree.
*/
public void destroy() {
destroy(true);
}
/**
* Destroy the process-tree.
* @param inBackground Process is to be killed in the back ground with
* a separate thread
*/
public void destroy(boolean inBackground) {
LOG.debug("Killing ProcfsBasedProcessTree of " + pid);
if (pid.equals(deadPid)) {
return;
}
if (isAlive(pid.toString())) {
if (isSetsidAvailable && setsidUsed) {
// In this case, we know that pid got created using setsid. So kill the
// whole processGroup.
try {
assertAndDestroyProcessGroup(pid.toString(), sleeptimeBeforeSigkill,
inBackground);
} catch (IOException e) {
LOG.warn(StringUtils.stringifyException(e));
}
}
else {
//TODO: Destroy all the processes in the subtree in this case also.
// For the time being, killing only the root process.
destroyProcess(pid.toString(), sleeptimeBeforeSigkill, inBackground);
}
}
}
private static final String PROCESSTREE_DUMP_FORMAT =
"\t|- %s %s %d %d %s %d %d %d %d %s%n";
/**
* Get a dump of the process-tree.
*
* @return a string concatenating the dump of information of all the processes
* in the process-tree
*/
public String getProcessTreeDump() {
StringBuilder ret = new StringBuilder();
// The header.
ret.append(String.format("\t|- PID PPID PGRPID SESSID CMD_NAME "
+ "USER_MODE_TIME(MILLIS) SYSTEM_TIME(MILLIS) VMEM_USAGE(BYTES) "
+ "RSSMEM_USAGE(PAGES) FULL_CMD_LINE%n"));
for (ProcessInfo p : processTree.values()) {
if (p != null) {
ret.append(String.format(PROCESSTREE_DUMP_FORMAT, p.getPid(), p
.getPpid(), p.getPgrpId(), p.getSessionId(), p.getName(), p
.getUtime(), p.getStime(), p.getVmem(), p.getRssmemPage(), p
.getCmdLine(procfsDir)));
}
}
return ret.toString();
}
/**
* Get the cumulative virtual memory used by all the processes in the
* process-tree.
*
* @return cumulative virtual memory used by the process-tree in bytes.
*/
public long getCumulativeVmem() {
// include all processes.. all processes will be older than 0.
return getCumulativeVmem(0);
}
/**
* Get the cumulative resident set size (rss) memory used by all the processes
* in the process-tree.
*
* @return cumulative rss memory used by the process-tree in bytes. return 0
* if it cannot be calculated
*/
public long getCumulativeRssmem() {
// include all processes.. all processes will be older than 0.
return getCumulativeRssmem(0);
}
/**
* Get the cumulative virtual memory used by all the processes in the
* process-tree that are older than the passed in age.
*
* @param olderThanAge processes above this age are included in the
* memory addition
* @return cumulative virtual memory used by the process-tree in bytes,
* for processes older than this age.
*/
public long getCumulativeVmem(int olderThanAge) {
long total = 0;
for (ProcessInfo p : processTree.values()) {
if ((p != null) && (p.getAge() > olderThanAge)) {
total += p.getVmem();
}
}
return total;
}
/**
* Get the cumulative resident set size (rss) memory used by all the processes
* in the process-tree that are older than the passed in age.
*
* @param olderThanAge processes above this age are included in the
* memory addition
* @return cumulative rss memory used by the process-tree in bytes,
* for processes older than this age. return 0 if it cannot be
* calculated
*/
public long getCumulativeRssmem(int olderThanAge) {
if (PAGE_SIZE < 0) {
return 0;
}
long totalPages = 0;
for (ProcessInfo p : processTree.values()) {
if ((p != null) && (p.getAge() > olderThanAge)) {
totalPages += p.getRssmemPage();
}
}
return totalPages * PAGE_SIZE; // convert # pages to byte
}
/**
* Get the CPU time in millisecond used by all the processes in the
* process-tree since the process-tree created
*
* @return cumulative CPU time in millisecond since the process-tree created
* return 0 if it cannot be calculated
*/
public long getCumulativeCpuTime() {
if (JIFFY_LENGTH_IN_MILLIS < 0) {
return 0;
}
long incJiffies = 0;
for (ProcessInfo p : processTree.values()) {
if (p != null) {
incJiffies += p.dtime;
}
}
cpuTime += incJiffies * JIFFY_LENGTH_IN_MILLIS;
return cpuTime;
}
private static String getValidPID(String pid) {
if (pid == null) return deadPid;
Matcher m = numberPattern.matcher(pid);
if (m.matches()) return pid;
return deadPid;
}
/**
* Get the list of all processes in the system.
*/
private List<String> getProcessList() {
String[] processDirs = (new File(procfsDir)).list();
List<String> processList = new ArrayList<String>();
for (String dir : processDirs) {
Matcher m = numberPattern.matcher(dir);
if (!m.matches()) continue;
try {
if ((new File(procfsDir, dir)).isDirectory()) {
processList.add(dir);
}
} catch (SecurityException s) {
// skip this process
}
}
return processList;
}
/**
* Construct the ProcessInfo using the process' PID and procfs rooted at the
* specified directory and return the same. It is provided mainly to assist
* testing purposes.
*
* Returns null on failing to read from procfs,
*
* @param pinfo ProcessInfo that needs to be updated
* @param procfsDir root of the proc file system
* @return updated ProcessInfo, null on errors.
*/
private static ProcessInfo constructProcessInfo(ProcessInfo pinfo,
String procfsDir) {
ProcessInfo ret = null;
// Read "procfsDir/<pid>/stat" file - typically /proc/<pid>/stat
BufferedReader in = null;
InputStreamReader fReader = null;
try {
File pidDir = new File(procfsDir, pinfo.getPid());
fReader = new InputStreamReader(new FileInputStream(
new File(pidDir, PROCFS_STAT_FILE)), Charsets.UTF_8);
in = new BufferedReader(fReader);
} catch (FileNotFoundException f) {
// The process vanished in the interim!
LOG.info("The process " + pinfo.getPid()
+ " may have finished in the interim.");
return ret;
}
ret = pinfo;
try {
String str = in.readLine(); // only one line
Matcher m = PROCFS_STAT_FILE_FORMAT.matcher(str);
boolean mat = m.find();
if (mat) {
// Set (name) (ppid) (pgrpId) (session) (utime) (stime) (vsize) (rss)
pinfo.updateProcessInfo(m.group(2), m.group(3),
Integer.parseInt(m.group(4)), Integer.parseInt(m.group(5)),
Long.parseLong(m.group(7)), new BigInteger(m.group(8)),
Long.parseLong(m.group(10)), Long.parseLong(m.group(11)));
} else {
LOG.warn("Unexpected: procfs stat file is not in the expected format"
+ " for process with pid " + pinfo.getPid());
ret = null;
}
} catch (IOException io) {
LOG.warn("Error reading the stream " + io);
ret = null;
} finally {
// Close the streams
try {
fReader.close();
try {
in.close();
} catch (IOException i) {
LOG.warn("Error closing the stream " + in);
}
} catch (IOException i) {
LOG.warn("Error closing the stream " + fReader);
}
}
return ret;
}
/**
* Returns a string printing PIDs of process present in the
* ProcfsBasedProcessTree. Output format : [pid pid ..]
*/
public String toString() {
StringBuffer pTree = new StringBuffer("[ ");
for (String p : processTree.keySet()) {
pTree.append(p);
pTree.append(" ");
}
return pTree.substring(0, pTree.length()) + "]";
}
/**
*
* Class containing information of a process.
*
*/
private static class ProcessInfo {
private String pid; // process-id
private String name; // command name
private Integer pgrpId; // process group-id
private String ppid; // parent process-id
private Integer sessionId; // session-id
private Long vmem; // virtual memory usage
private Long rssmemPage; // rss memory usage in # of pages
private Long utime = 0L; // # of jiffies in user mode
private final BigInteger MAX_LONG = BigInteger.valueOf(Long.MAX_VALUE);
private BigInteger stime = new BigInteger("0"); // # of jiffies in kernel mode
// how many times has this process been seen alive
private int age;
// # of jiffies used since last update:
private Long dtime = 0L;
// dtime = (utime + stime) - (utimeOld + stimeOld)
// We need this to compute the cumulative CPU time
// because the subprocess may finish earlier than root process
private List<ProcessInfo> children = new ArrayList<ProcessInfo>(); // list of children
public ProcessInfo(String pid) {
this.pid = pid;
// seeing this the first time.
this.age = 1;
}
public String getPid() {
return pid;
}
public String getName() {
return name;
}
public Integer getPgrpId() {
return pgrpId;
}
public String getPpid() {
return ppid;
}
public Integer getSessionId() {
return sessionId;
}
public Long getVmem() {
return vmem;
}
public Long getUtime() {
return utime;
}
public BigInteger getStime() {
return stime;
}
public Long getDtime() {
return dtime;
}
public Long getRssmemPage() { // get rss # of pages
return rssmemPage;
}
public int getAge() {
return age;
}
public boolean isParent(ProcessInfo p) {
if (pid.equals(p.getPpid())) {
return true;
}
return false;
}
public void updateProcessInfo(String name, String ppid, Integer pgrpId,
Integer sessionId, Long utime, BigInteger stime, Long vmem, Long rssmem) {
this.name = name;
this.ppid = ppid;
this.pgrpId = pgrpId;
this.sessionId = sessionId;
this.utime = utime;
this.stime = stime;
this.vmem = vmem;
this.rssmemPage = rssmem;
}
public void updateJiffy(ProcessInfo oldInfo) {
if (oldInfo == null) {
BigInteger sum = this.stime.add(BigInteger.valueOf(this.utime));
if (sum.compareTo(MAX_LONG) > 0) {
this.dtime = 0L;
LOG.warn("Sum of stime (" + this.stime + ") and utime (" + this.utime
+ ") is greater than " + Long.MAX_VALUE);
} else {
this.dtime = sum.longValue();
}
return;
}
this.dtime = (this.utime - oldInfo.utime +
this.stime.subtract(oldInfo.stime).longValue());
}
public void updateAge(ProcessInfo oldInfo) {
this.age = oldInfo.age + 1;
}
public boolean addChild(ProcessInfo p) {
return children.add(p);
}
public List<ProcessInfo> getChildren() {
return children;
}
public String getCmdLine(String procfsDir) {
String ret = "N/A";
if (pid == null) {
return ret;
}
BufferedReader in = null;
InputStreamReader fReader = null;
try {
fReader = new InputStreamReader(new FileInputStream(
new File(new File(procfsDir, pid), PROCFS_CMDLINE_FILE)),
Charsets.UTF_8);
} catch (FileNotFoundException f) {
// The process vanished in the interim!
return ret;
}
in = new BufferedReader(fReader);
try {
ret = in.readLine(); // only one line
if (ret == null) {
ret = "N/A";
} else {
ret = ret.replace('\0', ' '); // Replace each null char with a space
if (ret.equals("")) {
// The cmdline might be empty because the process is swapped out or
// is a zombie.
ret = "N/A";
}
}
} catch (IOException io) {
LOG.warn("Error reading the stream " + io);
ret = "N/A";
} finally {
// Close the streams
try {
fReader.close();
try {
in.close();
} catch (IOException i) {
LOG.warn("Error closing the stream " + in);
}
} catch (IOException i) {
LOG.warn("Error closing the stream " + fReader);
}
}
return ret;
}
}
}

View File

@ -1,165 +0,0 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.mapreduce.util;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.util.ReflectionUtils;
/**
* Plugin to calculate resource information on the system.
*
*/
@InterfaceAudience.Private
@InterfaceStability.Unstable
public abstract class ResourceCalculatorPlugin extends Configured {
/**
* Obtain the total size of the virtual memory present in the system.
*
* @return virtual memory size in bytes.
*/
public abstract long getVirtualMemorySize();
/**
* Obtain the total size of the physical memory present in the system.
*
* @return physical memory size bytes.
*/
public abstract long getPhysicalMemorySize();
/**
* Obtain the total size of the available virtual memory present
* in the system.
*
* @return available virtual memory size in bytes.
*/
public abstract long getAvailableVirtualMemorySize();
/**
* Obtain the total size of the available physical memory present
* in the system.
*
* @return available physical memory size bytes.
*/
public abstract long getAvailablePhysicalMemorySize();
/**
* Obtain the total number of processors present on the system.
*
* @return number of processors
*/
public abstract int getNumProcessors();
/**
* Obtain the CPU frequency of on the system.
*
* @return CPU frequency in kHz
*/
public abstract long getCpuFrequency();
/**
* Obtain the cumulative CPU time since the system is on.
*
* @return cumulative CPU time in milliseconds
*/
public abstract long getCumulativeCpuTime();
/**
* Obtain the CPU usage % of the machine. Return -1 if it is unavailable
*
* @return CPU usage in %
*/
public abstract float getCpuUsage();
/**
* Obtain resource status used by current process tree.
*/
@InterfaceAudience.Private
@InterfaceStability.Unstable
public abstract ProcResourceValues getProcResourceValues();
public static class ProcResourceValues {
private final long cumulativeCpuTime;
private final long physicalMemorySize;
private final long virtualMemorySize;
public ProcResourceValues(long cumulativeCpuTime, long physicalMemorySize,
long virtualMemorySize) {
this.cumulativeCpuTime = cumulativeCpuTime;
this.physicalMemorySize = physicalMemorySize;
this.virtualMemorySize = virtualMemorySize;
}
/**
* Obtain the physical memory size used by current process tree.
* @return physical memory size in bytes.
*/
public long getPhysicalMemorySize() {
return physicalMemorySize;
}
/**
* Obtain the virtual memory size used by a current process tree.
* @return virtual memory size in bytes.
*/
public long getVirtualMemorySize() {
return virtualMemorySize;
}
/**
* Obtain the cumulative CPU time used by a current process tree.
* @return cumulative CPU time in milliseconds
*/
public long getCumulativeCpuTime() {
return cumulativeCpuTime;
}
}
/**
* Get the ResourceCalculatorPlugin from the class name and configure it. If
* class name is null, this method will try and return a memory calculator
* plugin available for this system.
*
* @param clazz class-name
* @param conf configure the plugin with this.
* @return ResourceCalculatorPlugin
*/
public static ResourceCalculatorPlugin getResourceCalculatorPlugin(
Class<? extends ResourceCalculatorPlugin> clazz, Configuration conf) {
if (clazz != null) {
return ReflectionUtils.newInstance(clazz, conf);
}
// No class given, try a os specific class
try {
String osName = System.getProperty("os.name");
if (osName.startsWith("Linux")) {
return new LinuxResourceCalculatorPlugin();
}
} catch (SecurityException se) {
// Failed to get Operating System name.
return null;
}
// Not supported on this system.
return null;
}
}

View File

@ -1,236 +0,0 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.mapreduce.util;
import java.io.File;
import java.io.FileWriter;
import java.io.IOException;
import java.util.Random;
import junit.framework.TestCase;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.mapreduce.util.LinuxResourceCalculatorPlugin;
import org.junit.Test;
/**
* A JUnit test to test {@link LinuxResourceCalculatorPlugin}
* Create the fake /proc/ information and verify the parsing and calculation
*/
public class TestLinuxResourceCalculatorPlugin extends TestCase {
/**
* LinuxResourceCalculatorPlugin with a fake timer
*/
static class FakeLinuxResourceCalculatorPlugin extends
LinuxResourceCalculatorPlugin {
long currentTime = 0;
public FakeLinuxResourceCalculatorPlugin(String procfsMemFile,
String procfsCpuFile,
String procfsStatFile,
long jiffyLengthInMillis) {
super(procfsMemFile, procfsCpuFile, procfsStatFile, jiffyLengthInMillis);
}
@Override
long getCurrentTime() {
return currentTime;
}
public void advanceTime(long adv) {
currentTime += adv * jiffyLengthInMillis;
}
}
private static final FakeLinuxResourceCalculatorPlugin plugin;
private static String TEST_ROOT_DIR = new Path(System.getProperty(
"test.build.data", "/tmp")).toString().replace(' ', '+');
private static final String FAKE_MEMFILE;
private static final String FAKE_CPUFILE;
private static final String FAKE_STATFILE;
private static final long FAKE_JIFFY_LENGTH = 10L;
static {
int randomNum = (new Random()).nextInt(1000000000);
FAKE_MEMFILE = TEST_ROOT_DIR + File.separator + "MEMINFO_" + randomNum;
FAKE_CPUFILE = TEST_ROOT_DIR + File.separator + "CPUINFO_" + randomNum;
FAKE_STATFILE = TEST_ROOT_DIR + File.separator + "STATINFO_" + randomNum;
plugin = new FakeLinuxResourceCalculatorPlugin(FAKE_MEMFILE, FAKE_CPUFILE,
FAKE_STATFILE,
FAKE_JIFFY_LENGTH);
}
static final String MEMINFO_FORMAT =
"MemTotal: %d kB\n" +
"MemFree: %d kB\n" +
"Buffers: 138244 kB\n" +
"Cached: 947780 kB\n" +
"SwapCached: 142880 kB\n" +
"Active: 3229888 kB\n" +
"Inactive: %d kB\n" +
"SwapTotal: %d kB\n" +
"SwapFree: %d kB\n" +
"Dirty: 122012 kB\n" +
"Writeback: 0 kB\n" +
"AnonPages: 2710792 kB\n" +
"Mapped: 24740 kB\n" +
"Slab: 132528 kB\n" +
"SReclaimable: 105096 kB\n" +
"SUnreclaim: 27432 kB\n" +
"PageTables: 11448 kB\n" +
"NFS_Unstable: 0 kB\n" +
"Bounce: 0 kB\n" +
"CommitLimit: 4125904 kB\n" +
"Committed_AS: 4143556 kB\n" +
"VmallocTotal: 34359738367 kB\n" +
"VmallocUsed: 1632 kB\n" +
"VmallocChunk: 34359736375 kB\n" +
"HugePages_Total: 0\n" +
"HugePages_Free: 0\n" +
"HugePages_Rsvd: 0\n" +
"Hugepagesize: 2048 kB";
static final String CPUINFO_FORMAT =
"processor : %s\n" +
"vendor_id : AuthenticAMD\n" +
"cpu family : 15\n" +
"model : 33\n" +
"model name : Dual Core AMD Opteron(tm) Processor 280\n" +
"stepping : 2\n" +
"cpu MHz : %f\n" +
"cache size : 1024 KB\n" +
"physical id : 0\n" +
"siblings : 2\n" +
"core id : 0\n" +
"cpu cores : 2\n" +
"fpu : yes\n" +
"fpu_exception : yes\n" +
"cpuid level : 1\n" +
"wp : yes\n" +
"flags : fpu vme de pse tsc msr pae mce cx8 apic sep mtrr pge mca cmov " +
"pat pse36 clflush mmx fxsr sse sse2 ht syscall nx mmxext fxsr_opt lm " +
"3dnowext 3dnow pni lahf_lm cmp_legacy\n" +
"bogomips : 4792.41\n" +
"TLB size : 1024 4K pages\n" +
"clflush size : 64\n" +
"cache_alignment : 64\n" +
"address sizes : 40 bits physical, 48 bits virtual\n" +
"power management: ts fid vid ttp";
static final String STAT_FILE_FORMAT =
"cpu %d %d %d 1646495089 831319 48713 164346 0\n" +
"cpu0 15096055 30805 3823005 411456015 206027 13 14269 0\n" +
"cpu1 14760561 89890 6432036 408707910 456857 48074 130857 0\n" +
"cpu2 12761169 20842 3758639 413976772 98028 411 10288 0\n" +
"cpu3 12355207 47322 5789691 412354390 70406 213 8931 0\n" +
"intr 114648668 20010764 2 0 945665 2 0 0 0 0 0 0 0 4 0 0 0 0 0 0\n" +
"ctxt 242017731764\n" +
"btime 1257808753\n" +
"processes 26414943\n" +
"procs_running 1\n" +
"procs_blocked 0\n";
/**
* Test parsing /proc/stat and /proc/cpuinfo
* @throws IOException
*/
@Test
public void testParsingProcStatAndCpuFile() throws IOException {
// Write fake /proc/cpuinfo file.
long numProcessors = 8;
long cpuFrequencyKHz = 2392781;
String fileContent = "";
for (int i = 0; i < numProcessors; i++) {
fileContent += String.format(CPUINFO_FORMAT, i, cpuFrequencyKHz / 1000D) +
"\n";
}
File tempFile = new File(FAKE_CPUFILE);
tempFile.deleteOnExit();
FileWriter fWriter = new FileWriter(FAKE_CPUFILE);
fWriter.write(fileContent);
fWriter.close();
assertEquals(plugin.getNumProcessors(), numProcessors);
assertEquals(plugin.getCpuFrequency(), cpuFrequencyKHz);
// Write fake /proc/stat file.
long uTime = 54972994;
long nTime = 188860;
long sTime = 19803373;
tempFile = new File(FAKE_STATFILE);
tempFile.deleteOnExit();
updateStatFile(uTime, nTime, sTime);
assertEquals(plugin.getCumulativeCpuTime(),
FAKE_JIFFY_LENGTH * (uTime + nTime + sTime));
assertEquals(plugin.getCpuUsage(), (float)(LinuxResourceCalculatorPlugin.UNAVAILABLE));
// Advance the time and sample again to test the CPU usage calculation
uTime += 100L;
plugin.advanceTime(200L);
updateStatFile(uTime, nTime, sTime);
assertEquals(plugin.getCumulativeCpuTime(),
FAKE_JIFFY_LENGTH * (uTime + nTime + sTime));
assertEquals(plugin.getCpuUsage(), 6.25F);
// Advance the time and sample again. This time, we call getCpuUsage() only.
uTime += 600L;
plugin.advanceTime(300L);
updateStatFile(uTime, nTime, sTime);
assertEquals(plugin.getCpuUsage(), 25F);
// Advance very short period of time (one jiffy length).
// In this case, CPU usage should not be updated.
uTime += 1L;
plugin.advanceTime(1L);
updateStatFile(uTime, nTime, sTime);
assertEquals(plugin.getCumulativeCpuTime(),
FAKE_JIFFY_LENGTH * (uTime + nTime + sTime));
assertEquals(plugin.getCpuUsage(), 25F); // CPU usage is not updated.
}
/**
* Write information to fake /proc/stat file
*/
private void updateStatFile(long uTime, long nTime, long sTime)
throws IOException {
FileWriter fWriter = new FileWriter(FAKE_STATFILE);
fWriter.write(String.format(STAT_FILE_FORMAT, uTime, nTime, sTime));
fWriter.close();
}
/**
* Test parsing /proc/meminfo
* @throws IOException
*/
@Test
public void testParsingProcMemFile() throws IOException {
long memTotal = 4058864L;
long memFree = 99632L;
long inactive = 567732L;
long swapTotal = 2096472L;
long swapFree = 1818480L;
File tempFile = new File(FAKE_MEMFILE);
tempFile.deleteOnExit();
FileWriter fWriter = new FileWriter(FAKE_MEMFILE);
fWriter.write(String.format(MEMINFO_FORMAT,
memTotal, memFree, inactive, swapTotal, swapFree));
fWriter.close();
assertEquals(plugin.getAvailablePhysicalMemorySize(),
1024L * (memFree + inactive));
assertEquals(plugin.getAvailableVirtualMemorySize(),
1024L * (memFree + inactive + swapFree));
assertEquals(plugin.getPhysicalMemorySize(), 1024L * memTotal);
assertEquals(plugin.getVirtualMemorySize(), 1024L * (memTotal + swapTotal));
}
}

View File

@ -1,51 +0,0 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.mapreduce.util;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
/**
* Plugin to calculate virtual and physical memories on Linux systems.
* @deprecated
* Use {@link org.apache.hadoop.mapreduce.util.LinuxResourceCalculatorPlugin}
* instead
*/
@Deprecated
@InterfaceAudience.Private
@InterfaceStability.Unstable
public class LinuxMemoryCalculatorPlugin extends MemoryCalculatorPlugin {
private LinuxResourceCalculatorPlugin resourceCalculatorPlugin;
// Use everything from LinuxResourceCalculatorPlugin
public LinuxMemoryCalculatorPlugin() {
resourceCalculatorPlugin = new LinuxResourceCalculatorPlugin();
}
/** {@inheritDoc} */
@Override
public long getPhysicalMemorySize() {
return resourceCalculatorPlugin.getPhysicalMemorySize();
}
/** {@inheritDoc} */
@Override
public long getVirtualMemorySize() {
return resourceCalculatorPlugin.getVirtualMemorySize();
}
}

View File

@ -1,82 +0,0 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.mapreduce.util;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.util.ReflectionUtils;
/**
* Plugin to calculate virtual and physical memories on the system.
* @deprecated Use
* {@link org.apache.hadoop.mapreduce.util.ResourceCalculatorPlugin}
* instead
*/
@Deprecated
@InterfaceAudience.Private
@InterfaceStability.Unstable
public abstract class MemoryCalculatorPlugin extends Configured {
/**
* Obtain the total size of the virtual memory present in the system.
*
* @return virtual memory size in bytes.
*/
public abstract long getVirtualMemorySize();
/**
* Obtain the total size of the physical memory present in the system.
*
* @return physical memory size bytes.
*/
public abstract long getPhysicalMemorySize();
/**
* Get the MemoryCalculatorPlugin from the class name and configure it. If
* class name is null, this method will try and return a memory calculator
* plugin available for this system.
*
* @param clazz class-name
* @param conf configure the plugin with this.
* @return MemoryCalculatorPlugin
*/
public static MemoryCalculatorPlugin getMemoryCalculatorPlugin(
Class<? extends MemoryCalculatorPlugin> clazz, Configuration conf) {
if (clazz != null) {
return ReflectionUtils.newInstance(clazz, conf);
}
// No class given, try a os specific class
try {
String osName = System.getProperty("os.name");
if (osName.startsWith("Linux")) {
return new LinuxMemoryCalculatorPlugin();
}
} catch (SecurityException se) {
// Failed to get Operating System name.
return null;
}
// Not supported on this system.
return null;
}
}

View File

@ -1,677 +0,0 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.mapreduce.util;
import java.io.BufferedReader;
import java.io.BufferedWriter;
import java.io.File;
import java.io.FileNotFoundException;
import java.io.FileReader;
import java.io.FileWriter;
import java.io.IOException;
import java.util.Arrays;
import java.util.Random;
import java.util.Vector;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.fs.FileUtil;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.util.StringUtils;
import org.apache.hadoop.util.Shell.ExitCodeException;
import org.apache.hadoop.util.Shell.ShellCommandExecutor;
import junit.framework.TestCase;
/**
* A JUnit test to test ProcfsBasedProcessTree.
*/
public class TestProcfsBasedProcessTree extends TestCase {
private static final Log LOG = LogFactory
.getLog(TestProcfsBasedProcessTree.class);
private static String TEST_ROOT_DIR = new Path(System.getProperty(
"test.build.data", "/tmp")).toString().replace(' ', '+');
private ShellCommandExecutor shexec = null;
private String pidFile, lowestDescendant;
private String shellScript;
private static final int N = 6; // Controls the RogueTask
private class RogueTaskThread extends Thread {
public void run() {
try {
Vector<String> args = new Vector<String>();
if(ProcessTree.isSetsidAvailable) {
args.add("setsid");
}
args.add("bash");
args.add("-c");
args.add(" echo $$ > " + pidFile + "; sh " +
shellScript + " " + N + ";") ;
shexec = new ShellCommandExecutor(args.toArray(new String[0]));
shexec.execute();
} catch (ExitCodeException ee) {
LOG.info("Shell Command exit with a non-zero exit code. This is" +
" expected as we are killing the subprocesses of the" +
" task intentionally. " + ee);
} catch (IOException ioe) {
LOG.info("Error executing shell command " + ioe);
} finally {
LOG.info("Exit code: " + shexec.getExitCode());
}
}
}
private String getRogueTaskPID() {
File f = new File(pidFile);
while (!f.exists()) {
try {
Thread.sleep(500);
} catch (InterruptedException ie) {
break;
}
}
// read from pidFile
return getPidFromPidFile(pidFile);
}
public void testProcessTree() {
try {
if (!ProcfsBasedProcessTree.isAvailable()) {
System.out
.println("ProcfsBasedProcessTree is not available on this system. Not testing");
return;
}
} catch (Exception e) {
LOG.info(StringUtils.stringifyException(e));
return;
}
// create shell script
Random rm = new Random();
File tempFile = new File(TEST_ROOT_DIR, this.getName() + "_shellScript_" +
rm.nextInt() + ".sh");
tempFile.deleteOnExit();
shellScript = TEST_ROOT_DIR + File.separator + tempFile.getName();
// create pid file
tempFile = new File(TEST_ROOT_DIR, this.getName() + "_pidFile_" +
rm.nextInt() + ".pid");
tempFile.deleteOnExit();
pidFile = TEST_ROOT_DIR + File.separator + tempFile.getName();
lowestDescendant = TEST_ROOT_DIR + File.separator + "lowestDescendantPidFile";
// write to shell-script
try {
FileWriter fWriter = new FileWriter(shellScript);
fWriter.write(
"# rogue task\n" +
"sleep 1\n" +
"echo hello\n" +
"if [ $1 -ne 0 ]\n" +
"then\n" +
" sh " + shellScript + " $(($1-1))\n" +
"else\n" +
" echo $$ > " + lowestDescendant + "\n" +
" while true\n do\n" +
" sleep 5\n" +
" done\n" +
"fi");
fWriter.close();
} catch (IOException ioe) {
LOG.info("Error: " + ioe);
return;
}
Thread t = new RogueTaskThread();
t.start();
String pid = getRogueTaskPID();
LOG.info("Root process pid: " + pid);
ProcfsBasedProcessTree p = new ProcfsBasedProcessTree(pid,
ProcessTree.isSetsidAvailable,
ProcessTree.DEFAULT_SLEEPTIME_BEFORE_SIGKILL);
p.updateProcessTree(); // initialize
LOG.info("ProcessTree: " + p.toString());
File leaf = new File(lowestDescendant);
//wait till lowest descendant process of Rougue Task starts execution
while (!leaf.exists()) {
try {
Thread.sleep(500);
} catch (InterruptedException ie) {
break;
}
}
p.updateProcessTree(); // reconstruct
LOG.info("ProcessTree: " + p.toString());
// Get the process-tree dump
String processTreeDump = p.getProcessTreeDump();
// destroy the process and all its subprocesses
p.destroy(true/*in the background*/);
if(ProcessTree.isSetsidAvailable) {// whole processtree should be gone
assertEquals(false, p.isAnyProcessInTreeAlive());
}
else {// process should be gone
assertFalse("ProcessTree must have been gone", p.isAlive());
}
LOG.info("Process-tree dump follows: \n" + processTreeDump);
assertTrue("Process-tree dump doesn't start with a proper header",
processTreeDump.startsWith("\t|- PID PPID PGRPID SESSID CMD_NAME " +
"USER_MODE_TIME(MILLIS) SYSTEM_TIME(MILLIS) VMEM_USAGE(BYTES) " +
"RSSMEM_USAGE(PAGES) FULL_CMD_LINE\n"));
for (int i = N; i >= 0; i--) {
String cmdLineDump = "\\|- [0-9]+ [0-9]+ [0-9]+ [0-9]+ \\(sh\\)" +
" [0-9]+ [0-9]+ [0-9]+ [0-9]+ sh " + shellScript + " " + i;
Pattern pat = Pattern.compile(cmdLineDump);
Matcher mat = pat.matcher(processTreeDump);
assertTrue("Process-tree dump doesn't contain the cmdLineDump of " + i
+ "th process!", mat.find());
}
// Not able to join thread sometimes when forking with large N.
try {
t.join(2000);
LOG.info("RogueTaskThread successfully joined.");
} catch (InterruptedException ie) {
LOG.info("Interrupted while joining RogueTaskThread.");
}
// ProcessTree is gone now. Any further calls should be sane.
p.updateProcessTree();
assertFalse("ProcessTree must have been gone", p.isAlive());
assertTrue("Cumulative vmem for the gone-process is "
+ p.getCumulativeVmem() + " . It should be zero.", p
.getCumulativeVmem() == 0);
assertTrue(p.toString().equals("[ ]"));
}
/**
* Get PID from a pid-file.
*
* @param pidFileName
* Name of the pid-file.
* @return the PID string read from the pid-file. Returns null if the
* pidFileName points to a non-existing file or if read fails from the
* file.
*/
public static String getPidFromPidFile(String pidFileName) {
BufferedReader pidFile = null;
FileReader fReader = null;
String pid = null;
try {
fReader = new FileReader(pidFileName);
pidFile = new BufferedReader(fReader);
} catch (FileNotFoundException f) {
LOG.debug("PidFile doesn't exist : " + pidFileName);
return pid;
}
try {
pid = pidFile.readLine();
} catch (IOException i) {
LOG.error("Failed to read from " + pidFileName);
} finally {
try {
if (fReader != null) {
fReader.close();
}
try {
if (pidFile != null) {
pidFile.close();
}
} catch (IOException i) {
LOG.warn("Error closing the stream " + pidFile);
}
} catch (IOException i) {
LOG.warn("Error closing the stream " + fReader);
}
}
return pid;
}
public static class ProcessStatInfo {
// sample stat in a single line : 3910 (gpm) S 1 3910 3910 0 -1 4194624
// 83 0 0 0 0 0 0 0 16 0 1 0 7852 2408448 88 4294967295 134512640
// 134590050 3220521392 3220520036 10975138 0 0 4096 134234626
// 4294967295 0 0 17 1 0 0
String pid;
String name;
String ppid;
String pgrpId;
String session;
String vmem = "0";
String rssmemPage = "0";
String utime = "0";
String stime = "0";
public ProcessStatInfo(String[] statEntries) {
pid = statEntries[0];
name = statEntries[1];
ppid = statEntries[2];
pgrpId = statEntries[3];
session = statEntries[4];
vmem = statEntries[5];
if (statEntries.length > 6) {
rssmemPage = statEntries[6];
}
if (statEntries.length > 7) {
utime = statEntries[7];
stime = statEntries[8];
}
}
// construct a line that mimics the procfs stat file.
// all unused numerical entries are set to 0.
public String getStatLine() {
return String.format("%s (%s) S %s %s %s 0 0 0" +
" 0 0 0 0 %s %s 0 0 0 0 0 0 0 %s %s 0 0" +
" 0 0 0 0 0 0 0 0" +
" 0 0 0 0 0",
pid, name, ppid, pgrpId, session,
utime, stime, vmem, rssmemPage);
}
}
/**
* A basic test that creates a few process directories and writes
* stat files. Verifies that the cpu time and memory is correctly
* computed.
* @throws IOException if there was a problem setting up the
* fake procfs directories or files.
*/
public void testCpuAndMemoryForProcessTree() throws IOException {
// test processes
String[] pids = { "100", "200", "300", "400" };
// create the fake procfs root directory.
File procfsRootDir = new File(TEST_ROOT_DIR, "proc");
try {
setupProcfsRootDir(procfsRootDir);
setupPidDirs(procfsRootDir, pids);
// create stat objects.
// assuming processes 100, 200, 300 are in tree and 400 is not.
ProcessStatInfo[] procInfos = new ProcessStatInfo[4];
procInfos[0] = new ProcessStatInfo(new String[]
{"100", "proc1", "1", "100", "100", "100000", "100", "1000", "200"});
procInfos[1] = new ProcessStatInfo(new String[]
{"200", "proc2", "100", "100", "100", "200000", "200", "2000", "400"});
procInfos[2] = new ProcessStatInfo(new String[]
{"300", "proc3", "200", "100", "100", "300000", "300", "3000", "600"});
procInfos[3] = new ProcessStatInfo(new String[]
{"400", "proc4", "1", "400", "400", "400000", "400", "4000", "800"});
writeStatFiles(procfsRootDir, pids, procInfos);
// crank up the process tree class.
ProcfsBasedProcessTree processTree =
new ProcfsBasedProcessTree("100", true, 100L,
procfsRootDir.getAbsolutePath());
// build the process tree.
processTree.updateProcessTree();
// verify cumulative memory
assertEquals("Cumulative virtual memory does not match", 600000L,
processTree.getCumulativeVmem());
// verify rss memory
long cumuRssMem = ProcfsBasedProcessTree.PAGE_SIZE > 0 ?
600L * ProcfsBasedProcessTree.PAGE_SIZE : 0L;
assertEquals("Cumulative rss memory does not match",
cumuRssMem, processTree.getCumulativeRssmem());
// verify cumulative cpu time
long cumuCpuTime = ProcfsBasedProcessTree.JIFFY_LENGTH_IN_MILLIS > 0 ?
7200L * ProcfsBasedProcessTree.JIFFY_LENGTH_IN_MILLIS : 0L;
assertEquals("Cumulative cpu time does not match",
cumuCpuTime, processTree.getCumulativeCpuTime());
// test the cpu time again to see if it cumulates
procInfos[0] = new ProcessStatInfo(new String[]
{"100", "proc1", "1", "100", "100", "100000", "100", "2000", "300"});
procInfos[1] = new ProcessStatInfo(new String[]
{"200", "proc2", "100", "100", "100", "200000", "200", "3000", "500"});
writeStatFiles(procfsRootDir, pids, procInfos);
// build the process tree.
processTree.updateProcessTree();
// verify cumulative cpu time again
cumuCpuTime = ProcfsBasedProcessTree.JIFFY_LENGTH_IN_MILLIS > 0 ?
9400L * ProcfsBasedProcessTree.JIFFY_LENGTH_IN_MILLIS : 0L;
assertEquals("Cumulative cpu time does not match",
cumuCpuTime, processTree.getCumulativeCpuTime());
} finally {
FileUtil.fullyDelete(procfsRootDir);
}
}
/**
* Tests that cumulative memory is computed only for
* processes older than a given age.
* @throws IOException if there was a problem setting up the
* fake procfs directories or files.
*/
public void testMemForOlderProcesses() throws IOException {
// initial list of processes
String[] pids = { "100", "200", "300", "400" };
// create the fake procfs root directory.
File procfsRootDir = new File(TEST_ROOT_DIR, "proc");
try {
setupProcfsRootDir(procfsRootDir);
setupPidDirs(procfsRootDir, pids);
// create stat objects.
// assuming 100, 200 and 400 are in tree, 300 is not.
ProcessStatInfo[] procInfos = new ProcessStatInfo[4];
procInfos[0] = new ProcessStatInfo(new String[]
{"100", "proc1", "1", "100", "100", "100000", "100"});
procInfos[1] = new ProcessStatInfo(new String[]
{"200", "proc2", "100", "100", "100", "200000", "200"});
procInfos[2] = new ProcessStatInfo(new String[]
{"300", "proc3", "1", "300", "300", "300000", "300"});
procInfos[3] = new ProcessStatInfo(new String[]
{"400", "proc4", "100", "100", "100", "400000", "400"});
writeStatFiles(procfsRootDir, pids, procInfos);
// crank up the process tree class.
ProcfsBasedProcessTree processTree =
new ProcfsBasedProcessTree("100", true, 100L,
procfsRootDir.getAbsolutePath());
// build the process tree.
processTree.updateProcessTree();
// verify cumulative memory
assertEquals("Cumulative memory does not match",
700000L, processTree.getCumulativeVmem());
// write one more process as child of 100.
String[] newPids = { "500" };
setupPidDirs(procfsRootDir, newPids);
ProcessStatInfo[] newProcInfos = new ProcessStatInfo[1];
newProcInfos[0] = new ProcessStatInfo(new String[]
{"500", "proc5", "100", "100", "100", "500000", "500"});
writeStatFiles(procfsRootDir, newPids, newProcInfos);
// check memory includes the new process.
processTree.updateProcessTree();
assertEquals("Cumulative vmem does not include new process",
1200000L, processTree.getCumulativeVmem());
long cumuRssMem = ProcfsBasedProcessTree.PAGE_SIZE > 0 ?
1200L * ProcfsBasedProcessTree.PAGE_SIZE : 0L;
assertEquals("Cumulative rssmem does not include new process",
cumuRssMem, processTree.getCumulativeRssmem());
// however processes older than 1 iteration will retain the older value
assertEquals("Cumulative vmem shouldn't have included new process",
700000L, processTree.getCumulativeVmem(1));
cumuRssMem = ProcfsBasedProcessTree.PAGE_SIZE > 0 ?
700L * ProcfsBasedProcessTree.PAGE_SIZE : 0L;
assertEquals("Cumulative rssmem shouldn't have included new process",
cumuRssMem, processTree.getCumulativeRssmem(1));
// one more process
newPids = new String[]{ "600" };
setupPidDirs(procfsRootDir, newPids);
newProcInfos = new ProcessStatInfo[1];
newProcInfos[0] = new ProcessStatInfo(new String[]
{"600", "proc6", "100", "100", "100", "600000", "600"});
writeStatFiles(procfsRootDir, newPids, newProcInfos);
// refresh process tree
processTree.updateProcessTree();
// processes older than 2 iterations should be same as before.
assertEquals("Cumulative vmem shouldn't have included new processes",
700000L, processTree.getCumulativeVmem(2));
cumuRssMem = ProcfsBasedProcessTree.PAGE_SIZE > 0 ?
700L * ProcfsBasedProcessTree.PAGE_SIZE : 0L;
assertEquals("Cumulative rssmem shouldn't have included new processes",
cumuRssMem, processTree.getCumulativeRssmem(2));
// processes older than 1 iteration should not include new process,
// but include process 500
assertEquals("Cumulative vmem shouldn't have included new processes",
1200000L, processTree.getCumulativeVmem(1));
cumuRssMem = ProcfsBasedProcessTree.PAGE_SIZE > 0 ?
1200L * ProcfsBasedProcessTree.PAGE_SIZE : 0L;
assertEquals("Cumulative rssmem shouldn't have included new processes",
cumuRssMem, processTree.getCumulativeRssmem(1));
// no processes older than 3 iterations, this should be 0
assertEquals("Getting non-zero vmem for processes older than 3 iterations",
0L, processTree.getCumulativeVmem(3));
assertEquals("Getting non-zero rssmem for processes older than 3 iterations",
0L, processTree.getCumulativeRssmem(3));
} finally {
FileUtil.fullyDelete(procfsRootDir);
}
}
/**
* Verifies ProcfsBasedProcessTree.checkPidPgrpidForMatch() in case of
* 'constructProcessInfo() returning null' by not writing stat file for the
* mock process
* @throws IOException if there was a problem setting up the
* fake procfs directories or files.
*/
public void testDestroyProcessTree() throws IOException {
// test process
String pid = "100";
// create the fake procfs root directory.
File procfsRootDir = new File(TEST_ROOT_DIR, "proc");
try {
setupProcfsRootDir(procfsRootDir);
// crank up the process tree class.
ProcfsBasedProcessTree processTree = new ProcfsBasedProcessTree(
pid, true, 100L, procfsRootDir.getAbsolutePath());
// Let us not create stat file for pid 100.
assertTrue(ProcfsBasedProcessTree.checkPidPgrpidForMatch(
pid, procfsRootDir.getAbsolutePath()));
} finally {
FileUtil.fullyDelete(procfsRootDir);
}
}
/**
* Test the correctness of process-tree dump.
*
* @throws IOException
*/
public void testProcessTreeDump()
throws IOException {
String[] pids = { "100", "200", "300", "400", "500", "600" };
File procfsRootDir = new File(TEST_ROOT_DIR, "proc");
try {
setupProcfsRootDir(procfsRootDir);
setupPidDirs(procfsRootDir, pids);
int numProcesses = pids.length;
// Processes 200, 300, 400 and 500 are descendants of 100. 600 is not.
ProcessStatInfo[] procInfos = new ProcessStatInfo[numProcesses];
procInfos[0] = new ProcessStatInfo(new String[] {
"100", "proc1", "1", "100", "100", "100000", "100", "1000", "200"});
procInfos[1] = new ProcessStatInfo(new String[] {
"200", "proc2", "100", "100", "100", "200000", "200", "2000", "400"});
procInfos[2] = new ProcessStatInfo(new String[] {
"300", "proc3", "200", "100", "100", "300000", "300", "3000", "600"});
procInfos[3] = new ProcessStatInfo(new String[] {
"400", "proc4", "200", "100", "100", "400000", "400", "4000", "800"});
procInfos[4] = new ProcessStatInfo(new String[] {
"500", "proc5", "400", "100", "100", "400000", "400", "4000", "800"});
procInfos[5] = new ProcessStatInfo(new String[] {
"600", "proc6", "1", "1", "1", "400000", "400", "4000", "800"});
String[] cmdLines = new String[numProcesses];
cmdLines[0] = "proc1 arg1 arg2";
cmdLines[1] = "proc2 arg3 arg4";
cmdLines[2] = "proc3 arg5 arg6";
cmdLines[3] = "proc4 arg7 arg8";
cmdLines[4] = "proc5 arg9 arg10";
cmdLines[5] = "proc6 arg11 arg12";
writeStatFiles(procfsRootDir, pids, procInfos);
writeCmdLineFiles(procfsRootDir, pids, cmdLines);
ProcfsBasedProcessTree processTree =
new ProcfsBasedProcessTree("100", true, 100L, procfsRootDir
.getAbsolutePath());
// build the process tree.
processTree.updateProcessTree();
// Get the process-tree dump
String processTreeDump = processTree.getProcessTreeDump();
LOG.info("Process-tree dump follows: \n" + processTreeDump);
assertTrue("Process-tree dump doesn't start with a proper header",
processTreeDump.startsWith("\t|- PID PPID PGRPID SESSID CMD_NAME " +
"USER_MODE_TIME(MILLIS) SYSTEM_TIME(MILLIS) VMEM_USAGE(BYTES) " +
"RSSMEM_USAGE(PAGES) FULL_CMD_LINE\n"));
for (int i = 0; i < 5; i++) {
ProcessStatInfo p = procInfos[i];
assertTrue(
"Process-tree dump doesn't contain the cmdLineDump of process "
+ p.pid, processTreeDump.contains("\t|- " + p.pid + " "
+ p.ppid + " " + p.pgrpId + " " + p.session + " (" + p.name
+ ") " + p.utime + " " + p.stime + " " + p.vmem + " "
+ p.rssmemPage + " " + cmdLines[i]));
}
// 600 should not be in the dump
ProcessStatInfo p = procInfos[5];
assertFalse(
"Process-tree dump shouldn't contain the cmdLineDump of process "
+ p.pid, processTreeDump.contains("\t|- " + p.pid + " " + p.ppid
+ " " + p.pgrpId + " " + p.session + " (" + p.name + ") "
+ p.utime + " " + p.stime + " " + p.vmem + " " + cmdLines[5]));
} finally {
FileUtil.fullyDelete(procfsRootDir);
}
}
/**
* Create a directory to mimic the procfs file system's root.
* @param procfsRootDir root directory to create.
* @throws IOException if could not delete the procfs root directory
*/
public static void setupProcfsRootDir(File procfsRootDir) {
// cleanup any existing process root dir.
if (procfsRootDir.exists()) {
assertTrue(FileUtil.fullyDelete(procfsRootDir));
}
// create afresh
assertTrue(procfsRootDir.mkdirs());
}
/**
* Create PID directories under the specified procfs root directory
* @param procfsRootDir root directory of procfs file system
* @param pids the PID directories to create.
* @throws IOException If PID dirs could not be created
*/
public static void setupPidDirs(File procfsRootDir, String[] pids)
throws IOException {
for (String pid : pids) {
File pidDir = new File(procfsRootDir, pid);
pidDir.mkdir();
if (!pidDir.exists()) {
throw new IOException ("couldn't make process directory under " +
"fake procfs");
} else {
LOG.info("created pid dir");
}
}
}
/**
* Write stat files under the specified pid directories with data
* setup in the corresponding ProcessStatInfo objects
* @param procfsRootDir root directory of procfs file system
* @param pids the PID directories under which to create the stat file
* @param procs corresponding ProcessStatInfo objects whose data should be
* written to the stat files.
* @throws IOException if stat files could not be written
*/
public static void writeStatFiles(File procfsRootDir, String[] pids,
ProcessStatInfo[] procs) throws IOException {
for (int i=0; i<pids.length; i++) {
File statFile =
new File(new File(procfsRootDir, pids[i]),
ProcfsBasedProcessTree.PROCFS_STAT_FILE);
BufferedWriter bw = null;
try {
FileWriter fw = new FileWriter(statFile);
bw = new BufferedWriter(fw);
bw.write(procs[i].getStatLine());
LOG.info("wrote stat file for " + pids[i] +
" with contents: " + procs[i].getStatLine());
} finally {
// not handling exception - will throw an error and fail the test.
if (bw != null) {
bw.close();
}
}
}
}
private static void writeCmdLineFiles(File procfsRootDir, String[] pids,
String[] cmdLines)
throws IOException {
for (int i = 0; i < pids.length; i++) {
File statFile =
new File(new File(procfsRootDir, pids[i]),
ProcfsBasedProcessTree.PROCFS_CMDLINE_FILE);
BufferedWriter bw = null;
try {
bw = new BufferedWriter(new FileWriter(statFile));
bw.write(cmdLines[i]);
LOG.info("wrote command-line file for " + pids[i] + " with contents: "
+ cmdLines[i]);
} finally {
// not handling exception - will throw an error and fail the test.
if (bw != null) {
bw.close();
}
}
}
}
}

View File

@ -36,11 +36,11 @@ import org.apache.hadoop.mapreduce.TaskInputOutputContext;
import org.apache.hadoop.mapreduce.TaskType; import org.apache.hadoop.mapreduce.TaskType;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.server.tasktracker.TTConfig; import org.apache.hadoop.mapreduce.server.tasktracker.TTConfig;
import org.apache.hadoop.mapreduce.util.ResourceCalculatorPlugin;
import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.tools.rumen.JobStory; import org.apache.hadoop.tools.rumen.JobStory;
import org.apache.hadoop.tools.rumen.ResourceUsageMetrics; import org.apache.hadoop.tools.rumen.ResourceUsageMetrics;
import org.apache.hadoop.tools.rumen.TaskInfo; import org.apache.hadoop.tools.rumen.TaskInfo;
import org.apache.hadoop.yarn.util.ResourceCalculatorPlugin;
import java.io.IOException; import java.io.IOException;
import java.security.PrivilegedExceptionAction; import java.security.PrivilegedExceptionAction;

View File

@ -22,8 +22,8 @@ import java.util.Random;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.mapred.gridmix.Progressive; import org.apache.hadoop.mapred.gridmix.Progressive;
import org.apache.hadoop.mapreduce.util.ResourceCalculatorPlugin;
import org.apache.hadoop.tools.rumen.ResourceUsageMetrics; import org.apache.hadoop.tools.rumen.ResourceUsageMetrics;
import org.apache.hadoop.yarn.util.ResourceCalculatorPlugin;
/** /**
* <p>A {@link ResourceUsageEmulatorPlugin} that emulates the cumulative CPU * <p>A {@link ResourceUsageEmulatorPlugin} that emulates the cumulative CPU
@ -166,7 +166,7 @@ implements ResourceUsageEmulatorPlugin {
*/ */
public void calibrate(ResourceCalculatorPlugin monitor, public void calibrate(ResourceCalculatorPlugin monitor,
long totalCpuUsage) { long totalCpuUsage) {
long initTime = monitor.getProcResourceValues().getCumulativeCpuTime(); long initTime = monitor.getCumulativeCpuTime();
long defaultLoopSize = 0; long defaultLoopSize = 0;
long finalTime = initTime; long finalTime = initTime;
@ -175,7 +175,7 @@ implements ResourceUsageEmulatorPlugin {
while (finalTime - initTime < 100) { // 100 ms while (finalTime - initTime < 100) { // 100 ms
++defaultLoopSize; ++defaultLoopSize;
performUnitComputation(); //perform unit computation performUnitComputation(); //perform unit computation
finalTime = monitor.getProcResourceValues().getCumulativeCpuTime(); finalTime = monitor.getCumulativeCpuTime();
} }
long referenceRuntime = finalTime - initTime; long referenceRuntime = finalTime - initTime;
@ -250,7 +250,7 @@ implements ResourceUsageEmulatorPlugin {
// section // section
long currentCpuUsage = long currentCpuUsage =
monitor.getProcResourceValues().getCumulativeCpuTime(); monitor.getCumulativeCpuTime();
// estimate the cpu usage rate // estimate the cpu usage rate
float rate = (currentCpuUsage - lastSeenCpuUsageCpuUsage) float rate = (currentCpuUsage - lastSeenCpuUsageCpuUsage)
/ (currentProgress - lastSeenProgress); / (currentProgress - lastSeenProgress);
@ -264,7 +264,7 @@ implements ResourceUsageEmulatorPlugin {
(long)(targetCpuUsage (long)(targetCpuUsage
* getWeightForProgressInterval(currentProgress)); * getWeightForProgressInterval(currentProgress));
while (monitor.getProcResourceValues().getCumulativeCpuTime() while (monitor.getCumulativeCpuTime()
< currentWeighedTarget) { < currentWeighedTarget) {
emulatorCore.compute(); emulatorCore.compute();
// sleep for 100ms // sleep for 100ms
@ -282,7 +282,7 @@ implements ResourceUsageEmulatorPlugin {
lastSeenProgress = progress.getProgress(); lastSeenProgress = progress.getProgress();
// set the last seen usage // set the last seen usage
lastSeenCpuUsageCpuUsage = lastSeenCpuUsageCpuUsage =
monitor.getProcResourceValues().getCumulativeCpuTime(); monitor.getCumulativeCpuTime();
} }
} }
} }

View File

@ -20,7 +20,7 @@ package org.apache.hadoop.mapred.gridmix.emulators.resourceusage;
import java.io.IOException; import java.io.IOException;
import org.apache.hadoop.mapred.gridmix.Progressive; import org.apache.hadoop.mapred.gridmix.Progressive;
import org.apache.hadoop.mapreduce.util.ResourceCalculatorPlugin; import org.apache.hadoop.yarn.util.ResourceCalculatorPlugin;
import org.apache.hadoop.tools.rumen.ResourceUsageMetrics; import org.apache.hadoop.tools.rumen.ResourceUsageMetrics;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;

View File

@ -22,9 +22,9 @@ import java.util.List;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.mapred.gridmix.Progressive; import org.apache.hadoop.mapred.gridmix.Progressive;
import org.apache.hadoop.mapreduce.util.ResourceCalculatorPlugin;
import org.apache.hadoop.tools.rumen.ResourceUsageMetrics; import org.apache.hadoop.tools.rumen.ResourceUsageMetrics;
import org.apache.hadoop.util.ReflectionUtils; import org.apache.hadoop.util.ReflectionUtils;
import org.apache.hadoop.yarn.util.ResourceCalculatorPlugin;
/** /**
* <p>This is the driver class for managing all the resource usage emulators. * <p>This is the driver class for managing all the resource usage emulators.

View File

@ -21,8 +21,8 @@ import java.io.IOException;
import java.util.ArrayList; import java.util.ArrayList;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.mapred.gridmix.Progressive; import org.apache.hadoop.mapred.gridmix.Progressive;
import org.apache.hadoop.mapreduce.util.ResourceCalculatorPlugin;
import org.apache.hadoop.tools.rumen.ResourceUsageMetrics; import org.apache.hadoop.tools.rumen.ResourceUsageMetrics;
import org.apache.hadoop.yarn.util.ResourceCalculatorPlugin;
/** /**
* <p>A {@link ResourceUsageEmulatorPlugin} that emulates the total heap * <p>A {@link ResourceUsageEmulatorPlugin} that emulates the total heap

View File

@ -16,18 +16,17 @@
* limitations under the License. * limitations under the License.
*/ */
package org.apache.hadoop.mapred; package org.apache.hadoop.mapred.gridmix;
import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.mapreduce.util.ResourceCalculatorPlugin; import org.apache.hadoop.yarn.util.ResourceCalculatorPlugin;
/** /**
* Plugin class to test resource information reported by TT. Use * Plugin class to test resource information reported by NM. Use configuration
* configuration items {@link #MAXVMEM_TESTING_PROPERTY} and * items {@link #MAXVMEM_TESTING_PROPERTY} and {@link #MAXPMEM_TESTING_PROPERTY}
* {@link #MAXPMEM_TESTING_PROPERTY} to tell TT the total vmem and the total * to tell NM the total vmem and the total pmem. Use configuration items
* pmem. Use configuration items {@link #NUM_PROCESSORS}, * {@link #NUM_PROCESSORS}, {@link #CPU_FREQUENCY}, {@link #CUMULATIVE_CPU_TIME}
* {@link #CPU_FREQUENCY}, {@link #CUMULATIVE_CPU_TIME} and {@link #CPU_USAGE} * and {@link #CPU_USAGE} to tell TT the CPU information.
* to tell TT the CPU information.
*/ */
@InterfaceAudience.Private @InterfaceAudience.Private
public class DummyResourceCalculatorPlugin extends ResourceCalculatorPlugin { public class DummyResourceCalculatorPlugin extends ResourceCalculatorPlugin {
@ -48,15 +47,14 @@ public class DummyResourceCalculatorPlugin extends ResourceCalculatorPlugin {
public static final String CUMULATIVE_CPU_TIME = public static final String CUMULATIVE_CPU_TIME =
"mapred.tasktracker.cumulativecputime.testing"; "mapred.tasktracker.cumulativecputime.testing";
/** CPU usage percentage for testing */ /** CPU usage percentage for testing */
public static final String CPU_USAGE = public static final String CPU_USAGE = "mapred.tasktracker.cpuusage.testing";
"mapred.tasktracker.cpuusage.testing";
/** process cumulative CPU usage time for testing */ /** process cumulative CPU usage time for testing */
public static final String PROC_CUMULATIVE_CPU_TIME = public static final String PROC_CUMULATIVE_CPU_TIME =
"mapred.tasktracker.proccumulativecputime.testing"; "mapred.tasktracker.proccumulativecputime.testing";
/** process pmem for testing*/ /** process pmem for testing */
public static final String PROC_PMEM_TESTING_PROPERTY = public static final String PROC_PMEM_TESTING_PROPERTY =
"mapred.tasktracker.procpmem.testing"; "mapred.tasktracker.procpmem.testing";
/** process vmem for testing*/ /** process vmem for testing */
public static final String PROC_VMEM_TESTING_PROPERTY = public static final String PROC_VMEM_TESTING_PROPERTY =
"mapred.tasktracker.procvmem.testing"; "mapred.tasktracker.procvmem.testing";
@ -107,12 +105,4 @@ public class DummyResourceCalculatorPlugin extends ResourceCalculatorPlugin {
public float getCpuUsage() { public float getCpuUsage() {
return getConf().getFloat(CPU_USAGE, -1); return getConf().getFloat(CPU_USAGE, -1);
} }
@Override
public ProcResourceValues getProcResourceValues() {
long cpuTime = getConf().getLong(PROC_CUMULATIVE_CPU_TIME, -1);
long pMem = getConf().getLong(PROC_PMEM_TESTING_PROPERTY, -1);
long vMem = getConf().getLong(PROC_VMEM_TESTING_PROPERTY, -1);
return new ProcResourceValues(cpuTime, pMem, vMem);
}
} }

View File

@ -23,7 +23,6 @@ import static org.junit.Assert.*;
import java.io.IOException; import java.io.IOException;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.mapred.DummyResourceCalculatorPlugin;
import org.apache.hadoop.mapred.JobConf; import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.gridmix.DebugJobProducer.MockJob; import org.apache.hadoop.mapred.gridmix.DebugJobProducer.MockJob;
import org.apache.hadoop.mapred.gridmix.TestHighRamJob.DummyGridmixJob; import org.apache.hadoop.mapred.gridmix.TestHighRamJob.DummyGridmixJob;
@ -32,8 +31,8 @@ import org.apache.hadoop.mapred.gridmix.emulators.resourceusage.TotalHeapUsageEm
import org.apache.hadoop.mapred.gridmix.emulators.resourceusage.TotalHeapUsageEmulatorPlugin.DefaultHeapUsageEmulator; import org.apache.hadoop.mapred.gridmix.emulators.resourceusage.TotalHeapUsageEmulatorPlugin.DefaultHeapUsageEmulator;
import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.MRJobConfig; import org.apache.hadoop.mapreduce.MRJobConfig;
import org.apache.hadoop.mapreduce.util.ResourceCalculatorPlugin;
import org.apache.hadoop.tools.rumen.ResourceUsageMetrics; import org.apache.hadoop.tools.rumen.ResourceUsageMetrics;
import org.apache.hadoop.yarn.util.ResourceCalculatorPlugin;
/** /**
* Test Gridmix memory emulation. * Test Gridmix memory emulation.

View File

@ -31,14 +31,13 @@ import org.apache.hadoop.mapreduce.TaskInputOutputContext;
import org.apache.hadoop.mapreduce.TaskType; import org.apache.hadoop.mapreduce.TaskType;
import org.apache.hadoop.mapreduce.server.tasktracker.TTConfig; import org.apache.hadoop.mapreduce.server.tasktracker.TTConfig;
import org.apache.hadoop.mapreduce.task.MapContextImpl; import org.apache.hadoop.mapreduce.task.MapContextImpl;
import org.apache.hadoop.mapreduce.util.ResourceCalculatorPlugin;
import org.apache.hadoop.tools.rumen.ResourceUsageMetrics; import org.apache.hadoop.tools.rumen.ResourceUsageMetrics;
import org.apache.hadoop.mapred.DummyResourceCalculatorPlugin;
import org.apache.hadoop.mapred.gridmix.LoadJob.ResourceUsageMatcherRunner; import org.apache.hadoop.mapred.gridmix.LoadJob.ResourceUsageMatcherRunner;
import org.apache.hadoop.mapred.gridmix.emulators.resourceusage.CumulativeCpuUsageEmulatorPlugin; import org.apache.hadoop.mapred.gridmix.emulators.resourceusage.CumulativeCpuUsageEmulatorPlugin;
import org.apache.hadoop.mapred.gridmix.emulators.resourceusage.ResourceUsageEmulatorPlugin; import org.apache.hadoop.mapred.gridmix.emulators.resourceusage.ResourceUsageEmulatorPlugin;
import org.apache.hadoop.mapred.gridmix.emulators.resourceusage.ResourceUsageMatcher; import org.apache.hadoop.mapred.gridmix.emulators.resourceusage.ResourceUsageMatcher;
import org.apache.hadoop.mapred.gridmix.emulators.resourceusage.CumulativeCpuUsageEmulatorPlugin.DefaultCpuUsageEmulator; import org.apache.hadoop.mapred.gridmix.emulators.resourceusage.CumulativeCpuUsageEmulatorPlugin.DefaultCpuUsageEmulator;
import org.apache.hadoop.yarn.util.ResourceCalculatorPlugin;
/** /**
* Test Gridmix's resource emulator framework and supported plugins. * Test Gridmix's resource emulator framework and supported plugins.
@ -234,16 +233,6 @@ public class TestResourceUsageEmulators {
public long getCumulativeCpuTime() { public long getCumulativeCpuTime() {
return core.getCpuUsage(); return core.getCpuUsage();
} }
/**
* Returns a {@link ProcResourceValues} with cumulative cpu usage
* computed using {@link #getCumulativeCpuTime()}.
*/
@Override
public ProcResourceValues getProcResourceValues() {
long usageValue = getCumulativeCpuTime();
return new ProcResourceValues(usageValue, -1, -1);
}
} }
/** /**