HADOOP-12180. Move ResourceCalculatorPlugin from YARN to Common. (Chris Douglas via kasha)

(cherry picked from commit ac6048372a)
This commit is contained in:
Karthik Kambatla 2015-07-09 09:56:40 -07:00
parent ccf18705f7
commit fc989ebe16
14 changed files with 987 additions and 760 deletions

View File

@ -189,6 +189,9 @@ Release 2.8.0 - UNRELEASED
HADOOP-12201. Add tracing to FileSystem#createFileSystem and Globber#glob
(cmccabe)
HADOOP-12180. Move ResourceCalculatorPlugin from YARN to Common.
(Chris Douglas via kasha)
OPTIMIZATIONS
HADOOP-11785. Reduce the number of listStatus operation in distcp

View File

@ -16,38 +16,40 @@
* limitations under the License.
*/
package org.apache.hadoop.yarn.util;
package org.apache.hadoop.util;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import java.math.BigInteger;
/**
* Utility for sampling and computing CPU usage.
*/
@InterfaceAudience.Private
@InterfaceStability.Unstable
public class CpuTimeTracker {
public static final int UNAVAILABLE =
ResourceCalculatorProcessTree.UNAVAILABLE;
final long MINIMUM_UPDATE_INTERVAL;
public static final int UNAVAILABLE = -1;
private final long minimumTimeInterval;
// CPU used time since system is on (ms)
BigInteger cumulativeCpuTime = BigInteger.ZERO;
private BigInteger cumulativeCpuTime = BigInteger.ZERO;
// CPU used time read last time (ms)
BigInteger lastCumulativeCpuTime = BigInteger.ZERO;
private BigInteger lastCumulativeCpuTime = BigInteger.ZERO;
// Unix timestamp while reading the CPU time (ms)
long sampleTime;
long lastSampleTime;
float cpuUsage;
BigInteger jiffyLengthInMillis;
private long sampleTime;
private long lastSampleTime;
private float cpuUsage;
private BigInteger jiffyLengthInMillis;
public CpuTimeTracker(long jiffyLengthInMillis) {
this.jiffyLengthInMillis = BigInteger.valueOf(jiffyLengthInMillis);
this.cpuUsage = UNAVAILABLE;
this.sampleTime = UNAVAILABLE;
this.lastSampleTime = UNAVAILABLE;
MINIMUM_UPDATE_INTERVAL = 10 * jiffyLengthInMillis;
minimumTimeInterval = 10 * jiffyLengthInMillis;
}
/**
@ -58,7 +60,7 @@ public class CpuTimeTracker {
*
* @return Return percentage of cpu usage since last update, {@link
* CpuTimeTracker#UNAVAILABLE} if there haven't been 2 updates more than
* {@link CpuTimeTracker#MINIMUM_UPDATE_INTERVAL} apart
* {@link CpuTimeTracker#minimumTimeInterval} apart
*/
public float getCpuTrackerUsagePercent() {
if (lastSampleTime == UNAVAILABLE ||
@ -71,7 +73,7 @@ public class CpuTimeTracker {
// When lastSampleTime is sufficiently old, update cpuUsage.
// Also take a sample of the current time and cumulative CPU time for the
// use of the next calculation.
if (sampleTime > lastSampleTime + MINIMUM_UPDATE_INTERVAL) {
if (sampleTime > lastSampleTime + minimumTimeInterval) {
cpuUsage =
((cumulativeCpuTime.subtract(lastCumulativeCpuTime)).floatValue())
* 100F / ((float) (sampleTime - lastSampleTime));
@ -81,9 +83,22 @@ public class CpuTimeTracker {
return cpuUsage;
}
public void updateElapsedJiffies(BigInteger elapedJiffies, long sampleTime) {
this.cumulativeCpuTime = elapedJiffies.multiply(jiffyLengthInMillis);
this.sampleTime = sampleTime;
/**
* Obtain the cumulative CPU time since the system is on.
* @return cumulative CPU time in milliseconds
*/
public long getCumulativeCpuTime() {
return cumulativeCpuTime.longValue();
}
/**
* Apply delta to accumulators.
* @param elapsedJiffies updated jiffies
* @param newTime new sample time
*/
public void updateElapsedJiffies(BigInteger elapsedJiffies, long newTime) {
cumulativeCpuTime = elapsedJiffies.multiply(jiffyLengthInMillis);
sampleTime = newTime;
}
@Override
@ -97,4 +112,4 @@ public class CpuTimeTracker {
sb.append(" JiffyLengthMillisec " + this.jiffyLengthInMillis);
return sb.toString();
}
}
}

View File

@ -0,0 +1,111 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.util;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
/**
* Plugin to calculate resource information on the system.
*/
@InterfaceAudience.Public
@InterfaceStability.Evolving
public abstract class SysInfo {
/**
* Return default OS instance.
* @throws UnsupportedOperationException If cannot determine OS.
* @return Default instance for the detected OS.
*/
public static SysInfo newInstance() {
if (Shell.LINUX) {
return new SysInfoLinux();
}
if (Shell.WINDOWS) {
return new SysInfoWindows();
}
throw new UnsupportedOperationException("Could not determine OS");
}
/**
* Obtain the total size of the virtual memory present in the system.
*
* @return virtual memory size in bytes.
*/
public abstract long getVirtualMemorySize();
/**
* Obtain the total size of the physical memory present in the system.
*
* @return physical memory size bytes.
*/
public abstract long getPhysicalMemorySize();
/**
* Obtain the total size of the available virtual memory present
* in the system.
*
* @return available virtual memory size in bytes.
*/
public abstract long getAvailableVirtualMemorySize();
/**
* Obtain the total size of the available physical memory present
* in the system.
*
* @return available physical memory size bytes.
*/
public abstract long getAvailablePhysicalMemorySize();
/**
* Obtain the total number of logical processors present on the system.
*
* @return number of logical processors
*/
public abstract int getNumProcessors();
/**
* Obtain total number of physical cores present on the system.
*
* @return number of physical cores
*/
public abstract int getNumCores();
/**
* Obtain the CPU frequency of on the system.
*
* @return CPU frequency in kHz
*/
public abstract long getCpuFrequency();
/**
* Obtain the cumulative CPU time since the system is on.
*
* @return cumulative CPU time in milliseconds
*/
public abstract long getCumulativeCpuTime();
/**
* Obtain the CPU usage % of the machine. Return -1 if it is unavailable
*
* @return CPU usage as a percentage of available cycles.
*/
public abstract float getCpuUsage();
}

View File

@ -0,0 +1,444 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.util;
import java.io.BufferedReader;
import java.io.FileInputStream;
import java.io.FileNotFoundException;
import java.io.InputStreamReader;
import java.io.IOException;
import java.math.BigInteger;
import java.nio.charset.Charset;
import java.util.HashSet;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import com.google.common.annotations.VisibleForTesting;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.util.Shell.ShellCommandExecutor;
/**
* Plugin to calculate resource information on Linux systems.
*/
@InterfaceAudience.Private
@InterfaceStability.Evolving
public class SysInfoLinux extends SysInfo {
private static final Log LOG =
LogFactory.getLog(SysInfoLinux.class);
/**
* proc's meminfo virtual file has keys-values in the format
* "key:[ \t]*value[ \t]kB".
*/
private static final String PROCFS_MEMFILE = "/proc/meminfo";
private static final Pattern PROCFS_MEMFILE_FORMAT =
Pattern.compile("^([a-zA-Z]*):[ \t]*([0-9]*)[ \t]kB");
// We need the values for the following keys in meminfo
private static final String MEMTOTAL_STRING = "MemTotal";
private static final String SWAPTOTAL_STRING = "SwapTotal";
private static final String MEMFREE_STRING = "MemFree";
private static final String SWAPFREE_STRING = "SwapFree";
private static final String INACTIVE_STRING = "Inactive";
/**
* Patterns for parsing /proc/cpuinfo.
*/
private static final String PROCFS_CPUINFO = "/proc/cpuinfo";
private static final Pattern PROCESSOR_FORMAT =
Pattern.compile("^processor[ \t]:[ \t]*([0-9]*)");
private static final Pattern FREQUENCY_FORMAT =
Pattern.compile("^cpu MHz[ \t]*:[ \t]*([0-9.]*)");
private static final Pattern PHYSICAL_ID_FORMAT =
Pattern.compile("^physical id[ \t]*:[ \t]*([0-9]*)");
private static final Pattern CORE_ID_FORMAT =
Pattern.compile("^core id[ \t]*:[ \t]*([0-9]*)");
/**
* Pattern for parsing /proc/stat.
*/
private static final String PROCFS_STAT = "/proc/stat";
private static final Pattern CPU_TIME_FORMAT =
Pattern.compile("^cpu[ \t]*([0-9]*)" +
"[ \t]*([0-9]*)[ \t]*([0-9]*)[ \t].*");
private CpuTimeTracker cpuTimeTracker;
private String procfsMemFile;
private String procfsCpuFile;
private String procfsStatFile;
private long jiffyLengthInMillis;
private long ramSize = 0;
private long swapSize = 0;
private long ramSizeFree = 0; // free ram space on the machine (kB)
private long swapSizeFree = 0; // free swap space on the machine (kB)
private long inactiveSize = 0; // inactive cache memory (kB)
/* number of logical processors on the system. */
private int numProcessors = 0;
/* number of physical cores on the system. */
private int numCores = 0;
private long cpuFrequency = 0L; // CPU frequency on the system (kHz)
private boolean readMemInfoFile = false;
private boolean readCpuInfoFile = false;
public static final long PAGE_SIZE = getConf("PAGESIZE");
public static final long JIFFY_LENGTH_IN_MILLIS =
Math.max(Math.round(1000D / getConf("CLK_TCK")), -1);
private static long getConf(String attr) {
if(Shell.LINUX) {
try {
ShellCommandExecutor shellExecutorClk = new ShellCommandExecutor(
new String[] {"getconf", attr });
shellExecutorClk.execute();
return Long.parseLong(shellExecutorClk.getOutput().replace("\n", ""));
} catch (IOException|NumberFormatException e) {
return -1;
}
}
return -1;
}
/**
* Get current time.
* @return Unix time stamp in millisecond
*/
long getCurrentTime() {
return System.currentTimeMillis();
}
public SysInfoLinux() {
this(PROCFS_MEMFILE, PROCFS_CPUINFO, PROCFS_STAT,
JIFFY_LENGTH_IN_MILLIS);
}
/**
* Constructor which allows assigning the /proc/ directories. This will be
* used only in unit tests.
* @param procfsMemFile fake file for /proc/meminfo
* @param procfsCpuFile fake file for /proc/cpuinfo
* @param procfsStatFile fake file for /proc/stat
* @param jiffyLengthInMillis fake jiffy length value
*/
@VisibleForTesting
public SysInfoLinux(String procfsMemFile,
String procfsCpuFile,
String procfsStatFile,
long jiffyLengthInMillis) {
this.procfsMemFile = procfsMemFile;
this.procfsCpuFile = procfsCpuFile;
this.procfsStatFile = procfsStatFile;
this.jiffyLengthInMillis = jiffyLengthInMillis;
this.cpuTimeTracker = new CpuTimeTracker(jiffyLengthInMillis);
}
/**
* Read /proc/meminfo, parse and compute memory information only once.
*/
private void readProcMemInfoFile() {
readProcMemInfoFile(false);
}
/**
* Read /proc/meminfo, parse and compute memory information.
* @param readAgain if false, read only on the first time
*/
private void readProcMemInfoFile(boolean readAgain) {
if (readMemInfoFile && !readAgain) {
return;
}
// Read "/proc/memInfo" file
BufferedReader in;
InputStreamReader fReader;
try {
fReader = new InputStreamReader(
new FileInputStream(procfsMemFile), Charset.forName("UTF-8"));
in = new BufferedReader(fReader);
} catch (FileNotFoundException f) {
// shouldn't happen....
LOG.warn("Couldn't read " + procfsMemFile
+ "; can't determine memory settings");
return;
}
Matcher mat;
try {
String str = in.readLine();
while (str != null) {
mat = PROCFS_MEMFILE_FORMAT.matcher(str);
if (mat.find()) {
if (mat.group(1).equals(MEMTOTAL_STRING)) {
ramSize = Long.parseLong(mat.group(2));
} else if (mat.group(1).equals(SWAPTOTAL_STRING)) {
swapSize = Long.parseLong(mat.group(2));
} else if (mat.group(1).equals(MEMFREE_STRING)) {
ramSizeFree = Long.parseLong(mat.group(2));
} else if (mat.group(1).equals(SWAPFREE_STRING)) {
swapSizeFree = Long.parseLong(mat.group(2));
} else if (mat.group(1).equals(INACTIVE_STRING)) {
inactiveSize = Long.parseLong(mat.group(2));
}
}
str = in.readLine();
}
} catch (IOException io) {
LOG.warn("Error reading the stream " + io);
} finally {
// Close the streams
try {
fReader.close();
try {
in.close();
} catch (IOException i) {
LOG.warn("Error closing the stream " + in);
}
} catch (IOException i) {
LOG.warn("Error closing the stream " + fReader);
}
}
readMemInfoFile = true;
}
/**
* Read /proc/cpuinfo, parse and calculate CPU information.
*/
private void readProcCpuInfoFile() {
// This directory needs to be read only once
if (readCpuInfoFile) {
return;
}
HashSet<String> coreIdSet = new HashSet<>();
// Read "/proc/cpuinfo" file
BufferedReader in;
InputStreamReader fReader;
try {
fReader = new InputStreamReader(
new FileInputStream(procfsCpuFile), Charset.forName("UTF-8"));
in = new BufferedReader(fReader);
} catch (FileNotFoundException f) {
// shouldn't happen....
LOG.warn("Couldn't read " + procfsCpuFile + "; can't determine cpu info");
return;
}
Matcher mat;
try {
numProcessors = 0;
numCores = 1;
String currentPhysicalId = "";
String str = in.readLine();
while (str != null) {
mat = PROCESSOR_FORMAT.matcher(str);
if (mat.find()) {
numProcessors++;
}
mat = FREQUENCY_FORMAT.matcher(str);
if (mat.find()) {
cpuFrequency = (long)(Double.parseDouble(mat.group(1)) * 1000); // kHz
}
mat = PHYSICAL_ID_FORMAT.matcher(str);
if (mat.find()) {
currentPhysicalId = str;
}
mat = CORE_ID_FORMAT.matcher(str);
if (mat.find()) {
coreIdSet.add(currentPhysicalId + " " + str);
numCores = coreIdSet.size();
}
str = in.readLine();
}
} catch (IOException io) {
LOG.warn("Error reading the stream " + io);
} finally {
// Close the streams
try {
fReader.close();
try {
in.close();
} catch (IOException i) {
LOG.warn("Error closing the stream " + in);
}
} catch (IOException i) {
LOG.warn("Error closing the stream " + fReader);
}
}
readCpuInfoFile = true;
}
/**
* Read /proc/stat file, parse and calculate cumulative CPU.
*/
private void readProcStatFile() {
// Read "/proc/stat" file
BufferedReader in;
InputStreamReader fReader;
try {
fReader = new InputStreamReader(
new FileInputStream(procfsStatFile), Charset.forName("UTF-8"));
in = new BufferedReader(fReader);
} catch (FileNotFoundException f) {
// shouldn't happen....
return;
}
Matcher mat;
try {
String str = in.readLine();
while (str != null) {
mat = CPU_TIME_FORMAT.matcher(str);
if (mat.find()) {
long uTime = Long.parseLong(mat.group(1));
long nTime = Long.parseLong(mat.group(2));
long sTime = Long.parseLong(mat.group(3));
cpuTimeTracker.updateElapsedJiffies(
BigInteger.valueOf(uTime + nTime + sTime),
getCurrentTime());
break;
}
str = in.readLine();
}
} catch (IOException io) {
LOG.warn("Error reading the stream " + io);
} finally {
// Close the streams
try {
fReader.close();
try {
in.close();
} catch (IOException i) {
LOG.warn("Error closing the stream " + in);
}
} catch (IOException i) {
LOG.warn("Error closing the stream " + fReader);
}
}
}
/** {@inheritDoc} */
@Override
public long getPhysicalMemorySize() {
readProcMemInfoFile();
return ramSize * 1024;
}
/** {@inheritDoc} */
@Override
public long getVirtualMemorySize() {
readProcMemInfoFile();
return (ramSize + swapSize) * 1024;
}
/** {@inheritDoc} */
@Override
public long getAvailablePhysicalMemorySize() {
readProcMemInfoFile(true);
return (ramSizeFree + inactiveSize) * 1024;
}
/** {@inheritDoc} */
@Override
public long getAvailableVirtualMemorySize() {
readProcMemInfoFile(true);
return (ramSizeFree + swapSizeFree + inactiveSize) * 1024;
}
/** {@inheritDoc} */
@Override
public int getNumProcessors() {
readProcCpuInfoFile();
return numProcessors;
}
/** {@inheritDoc} */
@Override
public int getNumCores() {
readProcCpuInfoFile();
return numCores;
}
/** {@inheritDoc} */
@Override
public long getCpuFrequency() {
readProcCpuInfoFile();
return cpuFrequency;
}
/** {@inheritDoc} */
@Override
public long getCumulativeCpuTime() {
readProcStatFile();
return cpuTimeTracker.getCumulativeCpuTime();
}
/** {@inheritDoc} */
@Override
public float getCpuUsage() {
readProcStatFile();
float overallCpuUsage = cpuTimeTracker.getCpuTrackerUsagePercent();
if (overallCpuUsage != CpuTimeTracker.UNAVAILABLE) {
overallCpuUsage = overallCpuUsage / getNumProcessors();
}
return overallCpuUsage;
}
/**
* Test the {@link SysInfoLinux}.
*
* @param args - arguments to this calculator test
*/
public static void main(String[] args) {
SysInfoLinux plugin = new SysInfoLinux();
System.out.println("Physical memory Size (bytes) : "
+ plugin.getPhysicalMemorySize());
System.out.println("Total Virtual memory Size (bytes) : "
+ plugin.getVirtualMemorySize());
System.out.println("Available Physical memory Size (bytes) : "
+ plugin.getAvailablePhysicalMemorySize());
System.out.println("Total Available Virtual memory Size (bytes) : "
+ plugin.getAvailableVirtualMemorySize());
System.out.println("Number of Processors : " + plugin.getNumProcessors());
System.out.println("CPU frequency (kHz) : " + plugin.getCpuFrequency());
System.out.println("Cumulative CPU time (ms) : " +
plugin.getCumulativeCpuTime());
try {
// Sleep so we can compute the CPU usage
Thread.sleep(500L);
} catch (InterruptedException e) {
// do nothing
}
System.out.println("CPU usage % : " + plugin.getCpuUsage());
}
@VisibleForTesting
void setReadCpuInfoFile(boolean readCpuInfoFileValue) {
this.readCpuInfoFile = readCpuInfoFileValue;
}
public long getJiffyLengthInMillis() {
return this.jiffyLengthInMillis;
}
}

View File

@ -0,0 +1,181 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.util;
import java.io.IOException;
import com.google.common.annotations.VisibleForTesting;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.util.Shell.ShellCommandExecutor;
/**
* Plugin to calculate resource information on Windows systems.
*/
@InterfaceAudience.Private
@InterfaceStability.Evolving
public class SysInfoWindows extends SysInfo {
private static final Log LOG = LogFactory.getLog(SysInfoWindows.class);
private long vmemSize;
private long memSize;
private long vmemAvailable;
private long memAvailable;
private int numProcessors;
private long cpuFrequencyKhz;
private long cumulativeCpuTimeMs;
private float cpuUsage;
private long lastRefreshTime;
static final int REFRESH_INTERVAL_MS = 1000;
public SysInfoWindows() {
lastRefreshTime = 0;
reset();
}
@VisibleForTesting
long now() {
return System.nanoTime();
}
void reset() {
vmemSize = -1;
memSize = -1;
vmemAvailable = -1;
memAvailable = -1;
numProcessors = -1;
cpuFrequencyKhz = -1;
cumulativeCpuTimeMs = -1;
cpuUsage = -1;
}
String getSystemInfoInfoFromShell() {
ShellCommandExecutor shellExecutor = new ShellCommandExecutor(
new String[] {Shell.WINUTILS, "systeminfo" });
try {
shellExecutor.execute();
return shellExecutor.getOutput();
} catch (IOException e) {
LOG.error(StringUtils.stringifyException(e));
}
return null;
}
void refreshIfNeeded() {
long now = now();
if (now - lastRefreshTime > REFRESH_INTERVAL_MS) {
long refreshInterval = now - lastRefreshTime;
lastRefreshTime = now;
long lastCumCpuTimeMs = cumulativeCpuTimeMs;
reset();
String sysInfoStr = getSystemInfoInfoFromShell();
if (sysInfoStr != null) {
final int sysInfoSplitCount = 7;
String[] sysInfo = sysInfoStr.substring(0, sysInfoStr.indexOf("\r\n"))
.split(",");
if (sysInfo.length == sysInfoSplitCount) {
try {
vmemSize = Long.parseLong(sysInfo[0]);
memSize = Long.parseLong(sysInfo[1]);
vmemAvailable = Long.parseLong(sysInfo[2]);
memAvailable = Long.parseLong(sysInfo[3]);
numProcessors = Integer.parseInt(sysInfo[4]);
cpuFrequencyKhz = Long.parseLong(sysInfo[5]);
cumulativeCpuTimeMs = Long.parseLong(sysInfo[6]);
if (lastCumCpuTimeMs != -1) {
cpuUsage = (cumulativeCpuTimeMs - lastCumCpuTimeMs)
/ (refreshInterval * 1.0f);
}
} catch (NumberFormatException nfe) {
LOG.warn("Error parsing sysInfo", nfe);
}
} else {
LOG.warn("Expected split length of sysInfo to be "
+ sysInfoSplitCount + ". Got " + sysInfo.length);
}
}
}
}
/** {@inheritDoc} */
@Override
public long getVirtualMemorySize() {
refreshIfNeeded();
return vmemSize;
}
/** {@inheritDoc} */
@Override
public long getPhysicalMemorySize() {
refreshIfNeeded();
return memSize;
}
/** {@inheritDoc} */
@Override
public long getAvailableVirtualMemorySize() {
refreshIfNeeded();
return vmemAvailable;
}
/** {@inheritDoc} */
@Override
public long getAvailablePhysicalMemorySize() {
refreshIfNeeded();
return memAvailable;
}
/** {@inheritDoc} */
@Override
public int getNumProcessors() {
refreshIfNeeded();
return numProcessors;
}
/** {@inheritDoc} */
@Override
public int getNumCores() {
return getNumProcessors();
}
/** {@inheritDoc} */
@Override
public long getCpuFrequency() {
refreshIfNeeded();
return cpuFrequencyKhz;
}
/** {@inheritDoc} */
@Override
public long getCumulativeCpuTime() {
refreshIfNeeded();
return cumulativeCpuTimeMs;
}
/** {@inheritDoc} */
@Override
public float getCpuUsage() {
refreshIfNeeded();
return cpuUsage;
}
}

View File

@ -16,12 +16,11 @@
* limitations under the License.
*/
package org.apache.hadoop.yarn.util;
package org.apache.hadoop.util;
import java.io.File;
import java.io.FileWriter;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Random;
import org.apache.commons.io.IOUtils;
@ -31,30 +30,30 @@ import org.junit.Test;
import static org.junit.Assert.assertEquals;
/**
* A JUnit test to test {@link LinuxResourceCalculatorPlugin}
* A JUnit test to test {@link SysInfoLinux}
* Create the fake /proc/ information and verify the parsing and calculation
*/
public class TestLinuxResourceCalculatorPlugin {
public class TestSysInfoLinux {
/**
* LinuxResourceCalculatorPlugin with a fake timer
*/
static class FakeLinuxResourceCalculatorPlugin extends
LinuxResourceCalculatorPlugin {
long currentTime = 0;
public FakeLinuxResourceCalculatorPlugin(String procfsMemFile,
String procfsCpuFile,
String procfsStatFile,
long jiffyLengthInMillis) {
super(procfsMemFile, procfsCpuFile, procfsStatFile, jiffyLengthInMillis);
}
@Override
long getCurrentTime() {
return currentTime;
}
public void advanceTime(long adv) {
currentTime += adv * this.getJiffyLengthInMillis();
}
SysInfoLinux {
long currentTime = 0;
public FakeLinuxResourceCalculatorPlugin(String procfsMemFile,
String procfsCpuFile,
String procfsStatFile,
long jiffyLengthInMillis) {
super(procfsMemFile, procfsCpuFile, procfsStatFile, jiffyLengthInMillis);
}
@Override
long getCurrentTime() {
return currentTime;
}
public void advanceTime(long adv) {
currentTime += adv * this.getJiffyLengthInMillis();
}
}
private static final FakeLinuxResourceCalculatorPlugin plugin;
private static String TEST_ROOT_DIR = new Path(System.getProperty(
@ -72,36 +71,36 @@ public class TestLinuxResourceCalculatorPlugin {
FAKE_STATFILE,
FAKE_JIFFY_LENGTH);
}
static final String MEMINFO_FORMAT =
"MemTotal: %d kB\n" +
"MemFree: %d kB\n" +
"Buffers: 138244 kB\n" +
"Cached: 947780 kB\n" +
"SwapCached: 142880 kB\n" +
"Active: 3229888 kB\n" +
"Inactive: %d kB\n" +
"SwapTotal: %d kB\n" +
"SwapFree: %d kB\n" +
"Dirty: 122012 kB\n" +
"Writeback: 0 kB\n" +
"AnonPages: 2710792 kB\n" +
"Mapped: 24740 kB\n" +
"Slab: 132528 kB\n" +
"SReclaimable: 105096 kB\n" +
"SUnreclaim: 27432 kB\n" +
"PageTables: 11448 kB\n" +
"NFS_Unstable: 0 kB\n" +
"Bounce: 0 kB\n" +
"CommitLimit: 4125904 kB\n" +
"Committed_AS: 4143556 kB\n" +
"VmallocTotal: 34359738367 kB\n" +
"VmallocUsed: 1632 kB\n" +
"VmallocChunk: 34359736375 kB\n" +
"HugePages_Total: 0\n" +
"HugePages_Free: 0\n" +
"HugePages_Rsvd: 0\n" +
"Hugepagesize: 2048 kB";
static final String MEMINFO_FORMAT =
"MemTotal: %d kB\n" +
"MemFree: %d kB\n" +
"Buffers: 138244 kB\n" +
"Cached: 947780 kB\n" +
"SwapCached: 142880 kB\n" +
"Active: 3229888 kB\n" +
"Inactive: %d kB\n" +
"SwapTotal: %d kB\n" +
"SwapFree: %d kB\n" +
"Dirty: 122012 kB\n" +
"Writeback: 0 kB\n" +
"AnonPages: 2710792 kB\n" +
"Mapped: 24740 kB\n" +
"Slab: 132528 kB\n" +
"SReclaimable: 105096 kB\n" +
"SUnreclaim: 27432 kB\n" +
"PageTables: 11448 kB\n" +
"NFS_Unstable: 0 kB\n" +
"Bounce: 0 kB\n" +
"CommitLimit: 4125904 kB\n" +
"Committed_AS: 4143556 kB\n" +
"VmallocTotal: 34359738367 kB\n" +
"VmallocUsed: 1632 kB\n" +
"VmallocChunk: 34359736375 kB\n" +
"HugePages_Total: 0\n" +
"HugePages_Free: 0\n" +
"HugePages_Rsvd: 0\n" +
"Hugepagesize: 2048 kB";
static final String CPUINFO_FORMAT =
"processor : %s\n" +
"vendor_id : AuthenticAMD\n" +
@ -128,8 +127,8 @@ public class TestLinuxResourceCalculatorPlugin {
"cache_alignment : 64\n" +
"address sizes : 40 bits physical, 48 bits virtual\n" +
"power management: ts fid vid ttp";
static final String STAT_FILE_FORMAT =
static final String STAT_FILE_FORMAT =
"cpu %d %d %d 1646495089 831319 48713 164346 0\n" +
"cpu0 15096055 30805 3823005 411456015 206027 13 14269 0\n" +
"cpu1 14760561 89890 6432036 408707910 456857 48074 130857 0\n" +
@ -141,7 +140,7 @@ public class TestLinuxResourceCalculatorPlugin {
"processes 26414943\n" +
"procs_running 1\n" +
"procs_blocked 0\n";
/**
* Test parsing /proc/stat and /proc/cpuinfo
* @throws IOException
@ -164,7 +163,7 @@ public class TestLinuxResourceCalculatorPlugin {
fWriter.close();
assertEquals(plugin.getNumProcessors(), numProcessors);
assertEquals(plugin.getCpuFrequency(), cpuFrequencyKHz);
// Write fake /proc/stat file.
long uTime = 54972994;
long nTime = 188860;
@ -183,13 +182,13 @@ public class TestLinuxResourceCalculatorPlugin {
assertEquals(plugin.getCumulativeCpuTime(),
FAKE_JIFFY_LENGTH * (uTime + nTime + sTime));
assertEquals(plugin.getCpuUsage(), 6.25F, 0.0);
// Advance the time and sample again. This time, we call getCpuUsage() only.
uTime += 600L;
plugin.advanceTime(300L);
updateStatFile(uTime, nTime, sTime);
assertEquals(plugin.getCpuUsage(), 25F, 0.0);
// Advance very short period of time (one jiffy length).
// In this case, CPU usage should not be updated.
uTime += 1L;
@ -199,7 +198,7 @@ public class TestLinuxResourceCalculatorPlugin {
FAKE_JIFFY_LENGTH * (uTime + nTime + sTime));
assertEquals(plugin.getCpuUsage(), 25F, 0.0); // CPU usage is not updated.
}
/**
* Write information to fake /proc/stat file
*/
@ -209,7 +208,7 @@ public class TestLinuxResourceCalculatorPlugin {
fWriter.write(String.format(STAT_FILE_FORMAT, uTime, nTime, sTime));
fWriter.close();
}
/**
* Test parsing /proc/meminfo
* @throws IOException
@ -226,7 +225,7 @@ public class TestLinuxResourceCalculatorPlugin {
FileWriter fWriter = new FileWriter(FAKE_MEMFILE);
fWriter.write(String.format(MEMINFO_FORMAT,
memTotal, memFree, inactive, swapTotal, swapFree));
fWriter.close();
assertEquals(plugin.getAvailablePhysicalMemorySize(),
1024L * (memFree + inactive));

View File

@ -0,0 +1,100 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.util;
import org.junit.Test;
import static org.junit.Assert.*;
public class TestSysInfoWindows {
static class SysInfoWindowsMock extends SysInfoWindows {
private long time = SysInfoWindows.REFRESH_INTERVAL_MS + 1;
private String infoStr = null;
void setSysinfoString(String infoStr) {
this.infoStr = infoStr;
}
void advance(long dur) {
time += dur;
}
@Override
String getSystemInfoInfoFromShell() {
return infoStr;
}
@Override
long now() {
return time;
}
}
@Test(timeout = 10000)
public void parseSystemInfoString() {
SysInfoWindowsMock tester = new SysInfoWindowsMock();
tester.setSysinfoString(
"17177038848,8589467648,15232745472,6400417792,1,2805000,6261812\r\n");
// info str derived from windows shell command has \r\n termination
assertEquals(17177038848L, tester.getVirtualMemorySize());
assertEquals(8589467648L, tester.getPhysicalMemorySize());
assertEquals(15232745472L, tester.getAvailableVirtualMemorySize());
assertEquals(6400417792L, tester.getAvailablePhysicalMemorySize());
assertEquals(1, tester.getNumProcessors());
assertEquals(1, tester.getNumCores());
assertEquals(2805000L, tester.getCpuFrequency());
assertEquals(6261812L, tester.getCumulativeCpuTime());
// undef on first call
assertEquals(-1.0, tester.getCpuUsage(), 0.0);
}
@Test(timeout = 10000)
public void refreshAndCpuUsage() throws InterruptedException {
SysInfoWindowsMock tester = new SysInfoWindowsMock();
tester.setSysinfoString(
"17177038848,8589467648,15232745472,6400417792,1,2805000,6261812\r\n");
// info str derived from windows shell command has \r\n termination
tester.getAvailablePhysicalMemorySize();
// verify information has been refreshed
assertEquals(6400417792L, tester.getAvailablePhysicalMemorySize());
assertEquals(-1.0, tester.getCpuUsage(), 0.0);
tester.setSysinfoString(
"17177038848,8589467648,15232745472,5400417792,1,2805000,6263012\r\n");
tester.getAvailablePhysicalMemorySize();
// verify information has not been refreshed
assertEquals(6400417792L, tester.getAvailablePhysicalMemorySize());
assertEquals(-1.0, tester.getCpuUsage(), 0.0);
// advance clock
tester.advance(SysInfoWindows.REFRESH_INTERVAL_MS + 1);
// verify information has been refreshed
assertEquals(5400417792L, tester.getAvailablePhysicalMemorySize());
assertEquals((6263012 - 6261812) / (SysInfoWindows.REFRESH_INTERVAL_MS + 1f),
tester.getCpuUsage(), 0.0);
}
@Test(timeout = 10000)
public void errorInGetSystemInfo() {
SysInfoWindowsMock tester = new SysInfoWindowsMock();
// info str derived from windows shell command has \r\n termination
tester.setSysinfoString(null);
// call a method to refresh values
tester.getAvailablePhysicalMemorySize();
}
}

View File

@ -15,25 +15,11 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.util;
import java.io.BufferedReader;
import java.io.FileInputStream;
import java.io.FileNotFoundException;
import java.io.InputStreamReader;
import java.io.IOException;
import java.math.BigInteger;
import java.nio.charset.Charset;
import java.util.HashSet;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import com.google.common.annotations.VisibleForTesting;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.util.SysInfoLinux;
/**
* Plugin to calculate resource information on Linux systems.
@ -41,383 +27,9 @@ import org.apache.hadoop.classification.InterfaceStability;
@InterfaceAudience.Private
@InterfaceStability.Unstable
public class LinuxResourceCalculatorPlugin extends ResourceCalculatorPlugin {
private static final Log LOG =
LogFactory.getLog(LinuxResourceCalculatorPlugin.class);
/**
* proc's meminfo virtual file has keys-values in the format
* "key:[ \t]*value[ \t]kB".
*/
private static final String PROCFS_MEMFILE = "/proc/meminfo";
private static final Pattern PROCFS_MEMFILE_FORMAT =
Pattern.compile("^([a-zA-Z]*):[ \t]*([0-9]*)[ \t]kB");
// We need the values for the following keys in meminfo
private static final String MEMTOTAL_STRING = "MemTotal";
private static final String SWAPTOTAL_STRING = "SwapTotal";
private static final String MEMFREE_STRING = "MemFree";
private static final String SWAPFREE_STRING = "SwapFree";
private static final String INACTIVE_STRING = "Inactive";
/**
* Patterns for parsing /proc/cpuinfo.
*/
private static final String PROCFS_CPUINFO = "/proc/cpuinfo";
private static final Pattern PROCESSOR_FORMAT =
Pattern.compile("^processor[ \t]:[ \t]*([0-9]*)");
private static final Pattern FREQUENCY_FORMAT =
Pattern.compile("^cpu MHz[ \t]*:[ \t]*([0-9.]*)");
private static final Pattern PHYSICAL_ID_FORMAT =
Pattern.compile("^physical id[ \t]*:[ \t]*([0-9]*)");
private static final Pattern CORE_ID_FORMAT =
Pattern.compile("^core id[ \t]*:[ \t]*([0-9]*)");
/**
* Pattern for parsing /proc/stat.
*/
private static final String PROCFS_STAT = "/proc/stat";
private static final Pattern CPU_TIME_FORMAT =
Pattern.compile("^cpu[ \t]*([0-9]*)" +
"[ \t]*([0-9]*)[ \t]*([0-9]*)[ \t].*");
private CpuTimeTracker cpuTimeTracker;
private String procfsMemFile;
private String procfsCpuFile;
private String procfsStatFile;
private long jiffyLengthInMillis;
private long ramSize = 0;
private long swapSize = 0;
private long ramSizeFree = 0; // free ram space on the machine (kB)
private long swapSizeFree = 0; // free swap space on the machine (kB)
private long inactiveSize = 0; // inactive cache memory (kB)
/* number of logical processors on the system. */
private int numProcessors = 0;
/* number of physical cores on the system. */
private int numCores = 0;
private long cpuFrequency = 0L; // CPU frequency on the system (kHz)
private boolean readMemInfoFile = false;
private boolean readCpuInfoFile = false;
/**
* Get current time.
* @return Unix time stamp in millisecond
*/
long getCurrentTime() {
return System.currentTimeMillis();
}
public LinuxResourceCalculatorPlugin() {
this(PROCFS_MEMFILE, PROCFS_CPUINFO, PROCFS_STAT,
ProcfsBasedProcessTree.JIFFY_LENGTH_IN_MILLIS);
super(new SysInfoLinux());
}
/**
* Constructor which allows assigning the /proc/ directories. This will be
* used only in unit tests.
* @param procfsMemFile fake file for /proc/meminfo
* @param procfsCpuFile fake file for /proc/cpuinfo
* @param procfsStatFile fake file for /proc/stat
* @param jiffyLengthInMillis fake jiffy length value
*/
public LinuxResourceCalculatorPlugin(String procfsMemFile,
String procfsCpuFile,
String procfsStatFile,
long jiffyLengthInMillis) {
this.procfsMemFile = procfsMemFile;
this.procfsCpuFile = procfsCpuFile;
this.procfsStatFile = procfsStatFile;
this.jiffyLengthInMillis = jiffyLengthInMillis;
this.cpuTimeTracker = new CpuTimeTracker(jiffyLengthInMillis);
}
/**
* Read /proc/meminfo, parse and compute memory information only once.
*/
private void readProcMemInfoFile() {
readProcMemInfoFile(false);
}
/**
* Read /proc/meminfo, parse and compute memory information.
* @param readAgain if false, read only on the first time
*/
private void readProcMemInfoFile(boolean readAgain) {
if (readMemInfoFile && !readAgain) {
return;
}
// Read "/proc/memInfo" file
BufferedReader in;
InputStreamReader fReader;
try {
fReader = new InputStreamReader(
new FileInputStream(procfsMemFile), Charset.forName("UTF-8"));
in = new BufferedReader(fReader);
} catch (FileNotFoundException f) {
// shouldn't happen....
LOG.warn("Couldn't read " + procfsMemFile
+ "; can't determine memory settings");
return;
}
Matcher mat;
try {
String str = in.readLine();
while (str != null) {
mat = PROCFS_MEMFILE_FORMAT.matcher(str);
if (mat.find()) {
if (mat.group(1).equals(MEMTOTAL_STRING)) {
ramSize = Long.parseLong(mat.group(2));
} else if (mat.group(1).equals(SWAPTOTAL_STRING)) {
swapSize = Long.parseLong(mat.group(2));
} else if (mat.group(1).equals(MEMFREE_STRING)) {
ramSizeFree = Long.parseLong(mat.group(2));
} else if (mat.group(1).equals(SWAPFREE_STRING)) {
swapSizeFree = Long.parseLong(mat.group(2));
} else if (mat.group(1).equals(INACTIVE_STRING)) {
inactiveSize = Long.parseLong(mat.group(2));
}
}
str = in.readLine();
}
} catch (IOException io) {
LOG.warn("Error reading the stream " + io);
} finally {
// Close the streams
try {
fReader.close();
try {
in.close();
} catch (IOException i) {
LOG.warn("Error closing the stream " + in);
}
} catch (IOException i) {
LOG.warn("Error closing the stream " + fReader);
}
}
readMemInfoFile = true;
}
/**
* Read /proc/cpuinfo, parse and calculate CPU information.
*/
private void readProcCpuInfoFile() {
// This directory needs to be read only once
if (readCpuInfoFile) {
return;
}
HashSet<String> coreIdSet = new HashSet<>();
// Read "/proc/cpuinfo" file
BufferedReader in;
InputStreamReader fReader;
try {
fReader = new InputStreamReader(
new FileInputStream(procfsCpuFile), Charset.forName("UTF-8"));
in = new BufferedReader(fReader);
} catch (FileNotFoundException f) {
// shouldn't happen....
LOG.warn("Couldn't read " + procfsCpuFile + "; can't determine cpu info");
return;
}
Matcher mat;
try {
numProcessors = 0;
numCores = 1;
String currentPhysicalId = "";
String str = in.readLine();
while (str != null) {
mat = PROCESSOR_FORMAT.matcher(str);
if (mat.find()) {
numProcessors++;
}
mat = FREQUENCY_FORMAT.matcher(str);
if (mat.find()) {
cpuFrequency = (long)(Double.parseDouble(mat.group(1)) * 1000); // kHz
}
mat = PHYSICAL_ID_FORMAT.matcher(str);
if (mat.find()) {
currentPhysicalId = str;
}
mat = CORE_ID_FORMAT.matcher(str);
if (mat.find()) {
coreIdSet.add(currentPhysicalId + " " + str);
numCores = coreIdSet.size();
}
str = in.readLine();
}
} catch (IOException io) {
LOG.warn("Error reading the stream " + io);
} finally {
// Close the streams
try {
fReader.close();
try {
in.close();
} catch (IOException i) {
LOG.warn("Error closing the stream " + in);
}
} catch (IOException i) {
LOG.warn("Error closing the stream " + fReader);
}
}
readCpuInfoFile = true;
}
/**
* Read /proc/stat file, parse and calculate cumulative CPU.
*/
private void readProcStatFile() {
// Read "/proc/stat" file
BufferedReader in;
InputStreamReader fReader;
try {
fReader = new InputStreamReader(
new FileInputStream(procfsStatFile), Charset.forName("UTF-8"));
in = new BufferedReader(fReader);
} catch (FileNotFoundException f) {
// shouldn't happen....
return;
}
Matcher mat;
try {
String str = in.readLine();
while (str != null) {
mat = CPU_TIME_FORMAT.matcher(str);
if (mat.find()) {
long uTime = Long.parseLong(mat.group(1));
long nTime = Long.parseLong(mat.group(2));
long sTime = Long.parseLong(mat.group(3));
cpuTimeTracker.updateElapsedJiffies(
BigInteger.valueOf(uTime + nTime + sTime),
getCurrentTime());
break;
}
str = in.readLine();
}
} catch (IOException io) {
LOG.warn("Error reading the stream " + io);
} finally {
// Close the streams
try {
fReader.close();
try {
in.close();
} catch (IOException i) {
LOG.warn("Error closing the stream " + in);
}
} catch (IOException i) {
LOG.warn("Error closing the stream " + fReader);
}
}
}
/** {@inheritDoc} */
@Override
public long getPhysicalMemorySize() {
readProcMemInfoFile();
return ramSize * 1024;
}
/** {@inheritDoc} */
@Override
public long getVirtualMemorySize() {
readProcMemInfoFile();
return (ramSize + swapSize) * 1024;
}
/** {@inheritDoc} */
@Override
public long getAvailablePhysicalMemorySize() {
readProcMemInfoFile(true);
return (ramSizeFree + inactiveSize) * 1024;
}
/** {@inheritDoc} */
@Override
public long getAvailableVirtualMemorySize() {
readProcMemInfoFile(true);
return (ramSizeFree + swapSizeFree + inactiveSize) * 1024;
}
/** {@inheritDoc} */
@Override
public int getNumProcessors() {
readProcCpuInfoFile();
return numProcessors;
}
/** {@inheritDoc} */
@Override
public int getNumCores() {
readProcCpuInfoFile();
return numCores;
}
/** {@inheritDoc} */
@Override
public long getCpuFrequency() {
readProcCpuInfoFile();
return cpuFrequency;
}
/** {@inheritDoc} */
@Override
public long getCumulativeCpuTime() {
readProcStatFile();
return cpuTimeTracker.cumulativeCpuTime.longValue();
}
/** {@inheritDoc} */
@Override
public float getCpuUsage() {
readProcStatFile();
float overallCpuUsage = cpuTimeTracker.getCpuTrackerUsagePercent();
if (overallCpuUsage != CpuTimeTracker.UNAVAILABLE) {
overallCpuUsage = overallCpuUsage / getNumProcessors();
}
return overallCpuUsage;
}
/**
* Test the {@link LinuxResourceCalculatorPlugin}.
*
* @param args - arguments to this calculator test
*/
public static void main(String[] args) {
LinuxResourceCalculatorPlugin plugin = new LinuxResourceCalculatorPlugin();
System.out.println("Physical memory Size (bytes) : "
+ plugin.getPhysicalMemorySize());
System.out.println("Total Virtual memory Size (bytes) : "
+ plugin.getVirtualMemorySize());
System.out.println("Available Physical memory Size (bytes) : "
+ plugin.getAvailablePhysicalMemorySize());
System.out.println("Total Available Virtual memory Size (bytes) : "
+ plugin.getAvailableVirtualMemorySize());
System.out.println("Number of Processors : " + plugin.getNumProcessors());
System.out.println("CPU frequency (kHz) : " + plugin.getCpuFrequency());
System.out.println("Cumulative CPU time (ms) : " +
plugin.getCumulativeCpuTime());
try {
// Sleep so we can compute the CPU usage
Thread.sleep(500L);
} catch (InterruptedException e) {
// do nothing
}
System.out.println("CPU usage % : " + plugin.getCpuUsage());
}
@VisibleForTesting
void setReadCpuInfoFile(boolean readCpuInfoFileValue) {
this.readCpuInfoFile = readCpuInfoFileValue;
}
public long getJiffyLengthInMillis() {
return this.jiffyLengthInMillis;
}
}

View File

@ -40,9 +40,9 @@ import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.util.CpuTimeTracker;
import org.apache.hadoop.util.Shell;
import org.apache.hadoop.util.Shell.ShellCommandExecutor;
import org.apache.hadoop.util.StringUtils;
import org.apache.hadoop.util.SysInfoLinux;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
/**
@ -64,8 +64,9 @@ public class ProcfsBasedProcessTree extends ResourceCalculatorProcessTree {
public static final String PROCFS_STAT_FILE = "stat";
public static final String PROCFS_CMDLINE_FILE = "cmdline";
public static final long PAGE_SIZE;
public static final long JIFFY_LENGTH_IN_MILLIS; // in millisecond
public static final long PAGE_SIZE = SysInfoLinux.PAGE_SIZE;
public static final long JIFFY_LENGTH_IN_MILLIS =
SysInfoLinux.JIFFY_LENGTH_IN_MILLIS; // in millisecond
private final CpuTimeTracker cpuTimeTracker;
private Clock clock;
@ -108,31 +109,6 @@ public class ProcfsBasedProcessTree extends ResourceCalculatorProcessTree {
protected Map<String, ProcessTreeSmapMemInfo> processSMAPTree =
new HashMap<String, ProcessTreeSmapMemInfo>();
static {
long jiffiesPerSecond = -1;
long pageSize = -1;
try {
if(Shell.LINUX) {
ShellCommandExecutor shellExecutorClk = new ShellCommandExecutor(
new String[] { "getconf", "CLK_TCK" });
shellExecutorClk.execute();
jiffiesPerSecond = Long.parseLong(shellExecutorClk.getOutput().replace("\n", ""));
ShellCommandExecutor shellExecutorPage = new ShellCommandExecutor(
new String[] { "getconf", "PAGESIZE" });
shellExecutorPage.execute();
pageSize = Long.parseLong(shellExecutorPage.getOutput().replace("\n", ""));
}
} catch (IOException e) {
LOG.error(StringUtils.stringifyException(e));
} finally {
JIFFY_LENGTH_IN_MILLIS = jiffiesPerSecond != -1 ?
Math.round(1000D / jiffiesPerSecond) : -1;
PAGE_SIZE = pageSize;
}
}
// to enable testing, using this variable which can be configured
// to a test directory.
private String procfsDir;

View File

@ -15,7 +15,6 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.util;
import org.apache.hadoop.classification.InterfaceAudience;
@ -23,29 +22,42 @@ import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.util.ReflectionUtils;
import org.apache.hadoop.util.Shell;
import org.apache.hadoop.util.SysInfo;
/**
* Plugin to calculate resource information on the system.
*
*/
@InterfaceAudience.LimitedPrivate({"YARN", "MAPREDUCE"})
@InterfaceStability.Unstable
public abstract class ResourceCalculatorPlugin extends Configured {
public class ResourceCalculatorPlugin extends Configured {
private final SysInfo sys;
protected ResourceCalculatorPlugin() {
this(SysInfo.newInstance());
}
public ResourceCalculatorPlugin(SysInfo sys) {
this.sys = sys;
}
/**
* Obtain the total size of the virtual memory present in the system.
*
* @return virtual memory size in bytes.
*/
public abstract long getVirtualMemorySize();
public long getVirtualMemorySize() {
return sys.getVirtualMemorySize();
}
/**
* Obtain the total size of the physical memory present in the system.
*
* @return physical memory size bytes.
*/
public abstract long getPhysicalMemorySize();
public long getPhysicalMemorySize() {
return sys.getPhysicalMemorySize();
}
/**
* Obtain the total size of the available virtual memory present
@ -53,7 +65,9 @@ public abstract class ResourceCalculatorPlugin extends Configured {
*
* @return available virtual memory size in bytes.
*/
public abstract long getAvailableVirtualMemorySize();
public long getAvailableVirtualMemorySize() {
return sys.getAvailableVirtualMemorySize();
}
/**
* Obtain the total size of the available physical memory present
@ -61,42 +75,54 @@ public abstract class ResourceCalculatorPlugin extends Configured {
*
* @return available physical memory size bytes.
*/
public abstract long getAvailablePhysicalMemorySize();
public long getAvailablePhysicalMemorySize() {
return sys.getAvailablePhysicalMemorySize();
}
/**
* Obtain the total number of logical processors present on the system.
*
* @return number of logical processors
*/
public abstract int getNumProcessors();
public int getNumProcessors() {
return sys.getNumProcessors();
}
/**
* Obtain total number of physical cores present on the system.
*
* @return number of physical cores
*/
public abstract int getNumCores();
public int getNumCores() {
return sys.getNumCores();
}
/**
* Obtain the CPU frequency of on the system.
*
* @return CPU frequency in kHz
*/
public abstract long getCpuFrequency();
public long getCpuFrequency() {
return sys.getCpuFrequency();
}
/**
* Obtain the cumulative CPU time since the system is on.
*
* @return cumulative CPU time in milliseconds
*/
public abstract long getCumulativeCpuTime();
public long getCumulativeCpuTime() {
return sys.getCumulativeCpuTime();
}
/**
* Obtain the CPU usage % of the machine. Return -1 if it is unavailable
*
* @return CPU usage in %
*/
public abstract float getCpuUsage();
public float getCpuUsage() {
return sys.getCpuUsage();
}
/**
* Create the ResourceCalculatorPlugin from the class name and configure it. If
@ -114,21 +140,11 @@ public abstract class ResourceCalculatorPlugin extends Configured {
if (clazz != null) {
return ReflectionUtils.newInstance(clazz, conf);
}
// No class given, try a os specific class
try {
if (Shell.LINUX) {
return new LinuxResourceCalculatorPlugin();
}
if (Shell.WINDOWS) {
return new WindowsResourceCalculatorPlugin();
}
} catch (SecurityException se) {
// Failed to get Operating System name.
return new ResourceCalculatorPlugin();
} catch (SecurityException e) {
return null;
}
// Not supported on this system.
return null;
}
}

View File

@ -229,7 +229,7 @@ public class WindowsBasedProcessTree extends ResourceCalculatorProcessTree {
@Override
public float getCpuUsagePercent() {
return CpuTimeTracker.UNAVAILABLE;
return UNAVAILABLE;
}
}

View File

@ -15,162 +15,18 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.util;
import java.io.IOException;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.util.SysInfoWindows;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.classification.InterfaceAudience.Private;
import org.apache.hadoop.util.Shell;
import org.apache.hadoop.util.Shell.ShellCommandExecutor;
import org.apache.hadoop.util.StringUtils;
@Private
@InterfaceAudience.Private
@InterfaceStability.Unstable
public class WindowsResourceCalculatorPlugin extends ResourceCalculatorPlugin {
static final Log LOG = LogFactory
.getLog(WindowsResourceCalculatorPlugin.class);
long vmemSize;
long memSize;
long vmemAvailable;
long memAvailable;
int numProcessors;
long cpuFrequencyKhz;
long cumulativeCpuTimeMs;
float cpuUsage;
long lastRefreshTime;
private final int refreshIntervalMs = 1000;
WindowsBasedProcessTree pTree = null;
public WindowsResourceCalculatorPlugin() {
lastRefreshTime = 0;
reset();
}
void reset() {
vmemSize = -1;
memSize = -1;
vmemAvailable = -1;
memAvailable = -1;
numProcessors = -1;
cpuFrequencyKhz = -1;
cumulativeCpuTimeMs = -1;
cpuUsage = -1;
super(new SysInfoWindows());
}
String getSystemInfoInfoFromShell() {
ShellCommandExecutor shellExecutor = new ShellCommandExecutor(
new String[] { Shell.WINUTILS, "systeminfo" });
try {
shellExecutor.execute();
return shellExecutor.getOutput();
} catch (IOException e) {
LOG.error(StringUtils.stringifyException(e));
}
return null;
}
void refreshIfNeeded() {
long now = System.currentTimeMillis();
if (now - lastRefreshTime > refreshIntervalMs) {
long refreshInterval = now - lastRefreshTime;
lastRefreshTime = now;
long lastCumCpuTimeMs = cumulativeCpuTimeMs;
reset();
String sysInfoStr = getSystemInfoInfoFromShell();
if (sysInfoStr != null) {
final int sysInfoSplitCount = 7;
String[] sysInfo = sysInfoStr.substring(0, sysInfoStr.indexOf("\r\n"))
.split(",");
if (sysInfo.length == sysInfoSplitCount) {
try {
vmemSize = Long.parseLong(sysInfo[0]);
memSize = Long.parseLong(sysInfo[1]);
vmemAvailable = Long.parseLong(sysInfo[2]);
memAvailable = Long.parseLong(sysInfo[3]);
numProcessors = Integer.parseInt(sysInfo[4]);
cpuFrequencyKhz = Long.parseLong(sysInfo[5]);
cumulativeCpuTimeMs = Long.parseLong(sysInfo[6]);
if (lastCumCpuTimeMs != -1) {
cpuUsage = (cumulativeCpuTimeMs - lastCumCpuTimeMs)
/ (refreshInterval * 1.0f);
}
} catch (NumberFormatException nfe) {
LOG.warn("Error parsing sysInfo." + nfe);
}
} else {
LOG.warn("Expected split length of sysInfo to be "
+ sysInfoSplitCount + ". Got " + sysInfo.length);
}
}
}
}
/** {@inheritDoc} */
@Override
public long getVirtualMemorySize() {
refreshIfNeeded();
return vmemSize;
}
/** {@inheritDoc} */
@Override
public long getPhysicalMemorySize() {
refreshIfNeeded();
return memSize;
}
/** {@inheritDoc} */
@Override
public long getAvailableVirtualMemorySize() {
refreshIfNeeded();
return vmemAvailable;
}
/** {@inheritDoc} */
@Override
public long getAvailablePhysicalMemorySize() {
refreshIfNeeded();
return memAvailable;
}
/** {@inheritDoc} */
@Override
public int getNumProcessors() {
refreshIfNeeded();
return numProcessors;
}
/** {@inheritDoc} */
@Override
public int getNumCores() {
return getNumProcessors();
}
/** {@inheritDoc} */
@Override
public long getCpuFrequency() {
refreshIfNeeded();
return cpuFrequencyKhz;
}
/** {@inheritDoc} */
@Override
public long getCumulativeCpuTime() {
refreshIfNeeded();
return cumulativeCpuTimeMs;
}
/** {@inheritDoc} */
@Override
public float getCpuUsage() {
refreshIfNeeded();
return cpuUsage;
}
}

View File

@ -65,7 +65,7 @@ public class TestResourceCalculatorProcessTree {
@Override
public float getCpuUsagePercent() {
return CpuTimeTracker.UNAVAILABLE;
return UNAVAILABLE;
}
public boolean checkPidPgrpidForMatch() {

View File

@ -1,86 +0,0 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.util;
import org.junit.Test;
import static org.junit.Assert.assertTrue;
public class TestWindowsResourceCalculatorPlugin {
class WindowsResourceCalculatorPluginTester extends WindowsResourceCalculatorPlugin {
private String infoStr = null;
@Override
String getSystemInfoInfoFromShell() {
return infoStr;
}
}
@Test (timeout = 30000)
public void parseSystemInfoString() {
WindowsResourceCalculatorPluginTester tester = new WindowsResourceCalculatorPluginTester();
// info str derived from windows shell command has \r\n termination
tester.infoStr = "17177038848,8589467648,15232745472,6400417792,1,2805000,6261812\r\n";
// call a method to refresh values
tester.getAvailablePhysicalMemorySize();
// verify information has been refreshed
assertTrue(tester.vmemSize == 17177038848L);
assertTrue(tester.memSize == 8589467648L);
assertTrue(tester.vmemAvailable == 15232745472L);
assertTrue(tester.memAvailable == 6400417792L);
assertTrue(tester.numProcessors == 1);
assertTrue(tester.cpuFrequencyKhz == 2805000L);
assertTrue(tester.cumulativeCpuTimeMs == 6261812L);
assertTrue(tester.cpuUsage == -1);
}
@Test (timeout = 20000)
public void refreshAndCpuUsage() throws InterruptedException {
WindowsResourceCalculatorPluginTester tester = new WindowsResourceCalculatorPluginTester();
// info str derived from windows shell command has \r\n termination
tester.infoStr = "17177038848,8589467648,15232745472,6400417792,1,2805000,6261812\r\n";
tester.getAvailablePhysicalMemorySize();
// verify information has been refreshed
assertTrue(tester.memAvailable == 6400417792L);
assertTrue(tester.cpuUsage == -1);
tester.infoStr = "17177038848,8589467648,15232745472,5400417792,1,2805000,6261812\r\n";
tester.getAvailablePhysicalMemorySize();
// verify information has not been refreshed
assertTrue(tester.memAvailable == 6400417792L);
assertTrue(tester.cpuUsage == -1);
Thread.sleep(1500);
tester.infoStr = "17177038848,8589467648,15232745472,5400417792,1,2805000,6286812\r\n";
tester.getAvailablePhysicalMemorySize();
// verify information has been refreshed
assertTrue(tester.memAvailable == 5400417792L);
assertTrue(tester.cpuUsage >= 0.1);
}
@Test (timeout = 20000)
public void errorInGetSystemInfo() {
WindowsResourceCalculatorPluginTester tester = new WindowsResourceCalculatorPluginTester();
// info str derived from windows shell command has \r\n termination
tester.infoStr = null;
// call a method to refresh values
tester.getAvailablePhysicalMemorySize();
}
}