diff --git a/hadoop-common-project/hadoop-common/CHANGES.txt b/hadoop-common-project/hadoop-common/CHANGES.txt index 42a38518b8a..9e8004b9671 100644 --- a/hadoop-common-project/hadoop-common/CHANGES.txt +++ b/hadoop-common-project/hadoop-common/CHANGES.txt @@ -189,6 +189,9 @@ Release 2.8.0 - UNRELEASED HADOOP-12201. Add tracing to FileSystem#createFileSystem and Globber#glob (cmccabe) + HADOOP-12180. Move ResourceCalculatorPlugin from YARN to Common. + (Chris Douglas via kasha) + OPTIMIZATIONS HADOOP-11785. Reduce the number of listStatus operation in distcp diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/util/CpuTimeTracker.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/CpuTimeTracker.java similarity index 73% rename from hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/util/CpuTimeTracker.java rename to hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/CpuTimeTracker.java index b09a4b68400..3f17c9ab113 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/util/CpuTimeTracker.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/CpuTimeTracker.java @@ -16,38 +16,40 @@ * limitations under the License. */ -package org.apache.hadoop.yarn.util; +package org.apache.hadoop.util; import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceStability; import java.math.BigInteger; +/** + * Utility for sampling and computing CPU usage. + */ @InterfaceAudience.Private @InterfaceStability.Unstable public class CpuTimeTracker { - public static final int UNAVAILABLE = - ResourceCalculatorProcessTree.UNAVAILABLE; - final long MINIMUM_UPDATE_INTERVAL; + public static final int UNAVAILABLE = -1; + private final long minimumTimeInterval; // CPU used time since system is on (ms) - BigInteger cumulativeCpuTime = BigInteger.ZERO; + private BigInteger cumulativeCpuTime = BigInteger.ZERO; // CPU used time read last time (ms) - BigInteger lastCumulativeCpuTime = BigInteger.ZERO; + private BigInteger lastCumulativeCpuTime = BigInteger.ZERO; // Unix timestamp while reading the CPU time (ms) - long sampleTime; - long lastSampleTime; - float cpuUsage; - BigInteger jiffyLengthInMillis; + private long sampleTime; + private long lastSampleTime; + private float cpuUsage; + private BigInteger jiffyLengthInMillis; public CpuTimeTracker(long jiffyLengthInMillis) { this.jiffyLengthInMillis = BigInteger.valueOf(jiffyLengthInMillis); this.cpuUsage = UNAVAILABLE; this.sampleTime = UNAVAILABLE; this.lastSampleTime = UNAVAILABLE; - MINIMUM_UPDATE_INTERVAL = 10 * jiffyLengthInMillis; + minimumTimeInterval = 10 * jiffyLengthInMillis; } /** @@ -58,7 +60,7 @@ public class CpuTimeTracker { * * @return Return percentage of cpu usage since last update, {@link * CpuTimeTracker#UNAVAILABLE} if there haven't been 2 updates more than - * {@link CpuTimeTracker#MINIMUM_UPDATE_INTERVAL} apart + * {@link CpuTimeTracker#minimumTimeInterval} apart */ public float getCpuTrackerUsagePercent() { if (lastSampleTime == UNAVAILABLE || @@ -71,7 +73,7 @@ public class CpuTimeTracker { // When lastSampleTime is sufficiently old, update cpuUsage. // Also take a sample of the current time and cumulative CPU time for the // use of the next calculation. - if (sampleTime > lastSampleTime + MINIMUM_UPDATE_INTERVAL) { + if (sampleTime > lastSampleTime + minimumTimeInterval) { cpuUsage = ((cumulativeCpuTime.subtract(lastCumulativeCpuTime)).floatValue()) * 100F / ((float) (sampleTime - lastSampleTime)); @@ -81,9 +83,22 @@ public class CpuTimeTracker { return cpuUsage; } - public void updateElapsedJiffies(BigInteger elapedJiffies, long sampleTime) { - this.cumulativeCpuTime = elapedJiffies.multiply(jiffyLengthInMillis); - this.sampleTime = sampleTime; + /** + * Obtain the cumulative CPU time since the system is on. + * @return cumulative CPU time in milliseconds + */ + public long getCumulativeCpuTime() { + return cumulativeCpuTime.longValue(); + } + + /** + * Apply delta to accumulators. + * @param elapsedJiffies updated jiffies + * @param newTime new sample time + */ + public void updateElapsedJiffies(BigInteger elapsedJiffies, long newTime) { + cumulativeCpuTime = elapsedJiffies.multiply(jiffyLengthInMillis); + sampleTime = newTime; } @Override @@ -97,4 +112,4 @@ public class CpuTimeTracker { sb.append(" JiffyLengthMillisec " + this.jiffyLengthInMillis); return sb.toString(); } -} \ No newline at end of file +} diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/SysInfo.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/SysInfo.java new file mode 100644 index 00000000000..ec7fb240128 --- /dev/null +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/SysInfo.java @@ -0,0 +1,111 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.util; + +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.classification.InterfaceStability; + +/** + * Plugin to calculate resource information on the system. + */ +@InterfaceAudience.Public +@InterfaceStability.Evolving +public abstract class SysInfo { + + /** + * Return default OS instance. + * @throws UnsupportedOperationException If cannot determine OS. + * @return Default instance for the detected OS. + */ + public static SysInfo newInstance() { + if (Shell.LINUX) { + return new SysInfoLinux(); + } + if (Shell.WINDOWS) { + return new SysInfoWindows(); + } + throw new UnsupportedOperationException("Could not determine OS"); + } + + /** + * Obtain the total size of the virtual memory present in the system. + * + * @return virtual memory size in bytes. + */ + public abstract long getVirtualMemorySize(); + + /** + * Obtain the total size of the physical memory present in the system. + * + * @return physical memory size bytes. + */ + public abstract long getPhysicalMemorySize(); + + /** + * Obtain the total size of the available virtual memory present + * in the system. + * + * @return available virtual memory size in bytes. + */ + public abstract long getAvailableVirtualMemorySize(); + + /** + * Obtain the total size of the available physical memory present + * in the system. + * + * @return available physical memory size bytes. + */ + public abstract long getAvailablePhysicalMemorySize(); + + /** + * Obtain the total number of logical processors present on the system. + * + * @return number of logical processors + */ + public abstract int getNumProcessors(); + + /** + * Obtain total number of physical cores present on the system. + * + * @return number of physical cores + */ + public abstract int getNumCores(); + + /** + * Obtain the CPU frequency of on the system. + * + * @return CPU frequency in kHz + */ + public abstract long getCpuFrequency(); + + /** + * Obtain the cumulative CPU time since the system is on. + * + * @return cumulative CPU time in milliseconds + */ + public abstract long getCumulativeCpuTime(); + + /** + * Obtain the CPU usage % of the machine. Return -1 if it is unavailable + * + * @return CPU usage as a percentage of available cycles. + */ + public abstract float getCpuUsage(); + +} diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/SysInfoLinux.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/SysInfoLinux.java new file mode 100644 index 00000000000..055298db18d --- /dev/null +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/SysInfoLinux.java @@ -0,0 +1,444 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.util; + +import java.io.BufferedReader; +import java.io.FileInputStream; +import java.io.FileNotFoundException; +import java.io.InputStreamReader; +import java.io.IOException; +import java.math.BigInteger; +import java.nio.charset.Charset; +import java.util.HashSet; +import java.util.regex.Matcher; +import java.util.regex.Pattern; + +import com.google.common.annotations.VisibleForTesting; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.classification.InterfaceStability; +import org.apache.hadoop.util.Shell.ShellCommandExecutor; + +/** + * Plugin to calculate resource information on Linux systems. + */ +@InterfaceAudience.Private +@InterfaceStability.Evolving +public class SysInfoLinux extends SysInfo { + private static final Log LOG = + LogFactory.getLog(SysInfoLinux.class); + + /** + * proc's meminfo virtual file has keys-values in the format + * "key:[ \t]*value[ \t]kB". + */ + private static final String PROCFS_MEMFILE = "/proc/meminfo"; + private static final Pattern PROCFS_MEMFILE_FORMAT = + Pattern.compile("^([a-zA-Z]*):[ \t]*([0-9]*)[ \t]kB"); + + // We need the values for the following keys in meminfo + private static final String MEMTOTAL_STRING = "MemTotal"; + private static final String SWAPTOTAL_STRING = "SwapTotal"; + private static final String MEMFREE_STRING = "MemFree"; + private static final String SWAPFREE_STRING = "SwapFree"; + private static final String INACTIVE_STRING = "Inactive"; + + /** + * Patterns for parsing /proc/cpuinfo. + */ + private static final String PROCFS_CPUINFO = "/proc/cpuinfo"; + private static final Pattern PROCESSOR_FORMAT = + Pattern.compile("^processor[ \t]:[ \t]*([0-9]*)"); + private static final Pattern FREQUENCY_FORMAT = + Pattern.compile("^cpu MHz[ \t]*:[ \t]*([0-9.]*)"); + private static final Pattern PHYSICAL_ID_FORMAT = + Pattern.compile("^physical id[ \t]*:[ \t]*([0-9]*)"); + private static final Pattern CORE_ID_FORMAT = + Pattern.compile("^core id[ \t]*:[ \t]*([0-9]*)"); + + /** + * Pattern for parsing /proc/stat. + */ + private static final String PROCFS_STAT = "/proc/stat"; + private static final Pattern CPU_TIME_FORMAT = + Pattern.compile("^cpu[ \t]*([0-9]*)" + + "[ \t]*([0-9]*)[ \t]*([0-9]*)[ \t].*"); + private CpuTimeTracker cpuTimeTracker; + + private String procfsMemFile; + private String procfsCpuFile; + private String procfsStatFile; + private long jiffyLengthInMillis; + + private long ramSize = 0; + private long swapSize = 0; + private long ramSizeFree = 0; // free ram space on the machine (kB) + private long swapSizeFree = 0; // free swap space on the machine (kB) + private long inactiveSize = 0; // inactive cache memory (kB) + /* number of logical processors on the system. */ + private int numProcessors = 0; + /* number of physical cores on the system. */ + private int numCores = 0; + private long cpuFrequency = 0L; // CPU frequency on the system (kHz) + + private boolean readMemInfoFile = false; + private boolean readCpuInfoFile = false; + + public static final long PAGE_SIZE = getConf("PAGESIZE"); + public static final long JIFFY_LENGTH_IN_MILLIS = + Math.max(Math.round(1000D / getConf("CLK_TCK")), -1); + + private static long getConf(String attr) { + if(Shell.LINUX) { + try { + ShellCommandExecutor shellExecutorClk = new ShellCommandExecutor( + new String[] {"getconf", attr }); + shellExecutorClk.execute(); + return Long.parseLong(shellExecutorClk.getOutput().replace("\n", "")); + } catch (IOException|NumberFormatException e) { + return -1; + } + } + return -1; + } + + /** + * Get current time. + * @return Unix time stamp in millisecond + */ + long getCurrentTime() { + return System.currentTimeMillis(); + } + + public SysInfoLinux() { + this(PROCFS_MEMFILE, PROCFS_CPUINFO, PROCFS_STAT, + JIFFY_LENGTH_IN_MILLIS); + } + + /** + * Constructor which allows assigning the /proc/ directories. This will be + * used only in unit tests. + * @param procfsMemFile fake file for /proc/meminfo + * @param procfsCpuFile fake file for /proc/cpuinfo + * @param procfsStatFile fake file for /proc/stat + * @param jiffyLengthInMillis fake jiffy length value + */ + @VisibleForTesting + public SysInfoLinux(String procfsMemFile, + String procfsCpuFile, + String procfsStatFile, + long jiffyLengthInMillis) { + this.procfsMemFile = procfsMemFile; + this.procfsCpuFile = procfsCpuFile; + this.procfsStatFile = procfsStatFile; + this.jiffyLengthInMillis = jiffyLengthInMillis; + this.cpuTimeTracker = new CpuTimeTracker(jiffyLengthInMillis); + } + + /** + * Read /proc/meminfo, parse and compute memory information only once. + */ + private void readProcMemInfoFile() { + readProcMemInfoFile(false); + } + + /** + * Read /proc/meminfo, parse and compute memory information. + * @param readAgain if false, read only on the first time + */ + private void readProcMemInfoFile(boolean readAgain) { + + if (readMemInfoFile && !readAgain) { + return; + } + + // Read "/proc/memInfo" file + BufferedReader in; + InputStreamReader fReader; + try { + fReader = new InputStreamReader( + new FileInputStream(procfsMemFile), Charset.forName("UTF-8")); + in = new BufferedReader(fReader); + } catch (FileNotFoundException f) { + // shouldn't happen.... + LOG.warn("Couldn't read " + procfsMemFile + + "; can't determine memory settings"); + return; + } + + Matcher mat; + + try { + String str = in.readLine(); + while (str != null) { + mat = PROCFS_MEMFILE_FORMAT.matcher(str); + if (mat.find()) { + if (mat.group(1).equals(MEMTOTAL_STRING)) { + ramSize = Long.parseLong(mat.group(2)); + } else if (mat.group(1).equals(SWAPTOTAL_STRING)) { + swapSize = Long.parseLong(mat.group(2)); + } else if (mat.group(1).equals(MEMFREE_STRING)) { + ramSizeFree = Long.parseLong(mat.group(2)); + } else if (mat.group(1).equals(SWAPFREE_STRING)) { + swapSizeFree = Long.parseLong(mat.group(2)); + } else if (mat.group(1).equals(INACTIVE_STRING)) { + inactiveSize = Long.parseLong(mat.group(2)); + } + } + str = in.readLine(); + } + } catch (IOException io) { + LOG.warn("Error reading the stream " + io); + } finally { + // Close the streams + try { + fReader.close(); + try { + in.close(); + } catch (IOException i) { + LOG.warn("Error closing the stream " + in); + } + } catch (IOException i) { + LOG.warn("Error closing the stream " + fReader); + } + } + + readMemInfoFile = true; + } + + /** + * Read /proc/cpuinfo, parse and calculate CPU information. + */ + private void readProcCpuInfoFile() { + // This directory needs to be read only once + if (readCpuInfoFile) { + return; + } + HashSet coreIdSet = new HashSet<>(); + // Read "/proc/cpuinfo" file + BufferedReader in; + InputStreamReader fReader; + try { + fReader = new InputStreamReader( + new FileInputStream(procfsCpuFile), Charset.forName("UTF-8")); + in = new BufferedReader(fReader); + } catch (FileNotFoundException f) { + // shouldn't happen.... + LOG.warn("Couldn't read " + procfsCpuFile + "; can't determine cpu info"); + return; + } + Matcher mat; + try { + numProcessors = 0; + numCores = 1; + String currentPhysicalId = ""; + String str = in.readLine(); + while (str != null) { + mat = PROCESSOR_FORMAT.matcher(str); + if (mat.find()) { + numProcessors++; + } + mat = FREQUENCY_FORMAT.matcher(str); + if (mat.find()) { + cpuFrequency = (long)(Double.parseDouble(mat.group(1)) * 1000); // kHz + } + mat = PHYSICAL_ID_FORMAT.matcher(str); + if (mat.find()) { + currentPhysicalId = str; + } + mat = CORE_ID_FORMAT.matcher(str); + if (mat.find()) { + coreIdSet.add(currentPhysicalId + " " + str); + numCores = coreIdSet.size(); + } + str = in.readLine(); + } + } catch (IOException io) { + LOG.warn("Error reading the stream " + io); + } finally { + // Close the streams + try { + fReader.close(); + try { + in.close(); + } catch (IOException i) { + LOG.warn("Error closing the stream " + in); + } + } catch (IOException i) { + LOG.warn("Error closing the stream " + fReader); + } + } + readCpuInfoFile = true; + } + + /** + * Read /proc/stat file, parse and calculate cumulative CPU. + */ + private void readProcStatFile() { + // Read "/proc/stat" file + BufferedReader in; + InputStreamReader fReader; + try { + fReader = new InputStreamReader( + new FileInputStream(procfsStatFile), Charset.forName("UTF-8")); + in = new BufferedReader(fReader); + } catch (FileNotFoundException f) { + // shouldn't happen.... + return; + } + + Matcher mat; + try { + String str = in.readLine(); + while (str != null) { + mat = CPU_TIME_FORMAT.matcher(str); + if (mat.find()) { + long uTime = Long.parseLong(mat.group(1)); + long nTime = Long.parseLong(mat.group(2)); + long sTime = Long.parseLong(mat.group(3)); + cpuTimeTracker.updateElapsedJiffies( + BigInteger.valueOf(uTime + nTime + sTime), + getCurrentTime()); + break; + } + str = in.readLine(); + } + } catch (IOException io) { + LOG.warn("Error reading the stream " + io); + } finally { + // Close the streams + try { + fReader.close(); + try { + in.close(); + } catch (IOException i) { + LOG.warn("Error closing the stream " + in); + } + } catch (IOException i) { + LOG.warn("Error closing the stream " + fReader); + } + } + } + + /** {@inheritDoc} */ + @Override + public long getPhysicalMemorySize() { + readProcMemInfoFile(); + return ramSize * 1024; + } + + /** {@inheritDoc} */ + @Override + public long getVirtualMemorySize() { + readProcMemInfoFile(); + return (ramSize + swapSize) * 1024; + } + + /** {@inheritDoc} */ + @Override + public long getAvailablePhysicalMemorySize() { + readProcMemInfoFile(true); + return (ramSizeFree + inactiveSize) * 1024; + } + + /** {@inheritDoc} */ + @Override + public long getAvailableVirtualMemorySize() { + readProcMemInfoFile(true); + return (ramSizeFree + swapSizeFree + inactiveSize) * 1024; + } + + /** {@inheritDoc} */ + @Override + public int getNumProcessors() { + readProcCpuInfoFile(); + return numProcessors; + } + + /** {@inheritDoc} */ + @Override + public int getNumCores() { + readProcCpuInfoFile(); + return numCores; + } + + /** {@inheritDoc} */ + @Override + public long getCpuFrequency() { + readProcCpuInfoFile(); + return cpuFrequency; + } + + /** {@inheritDoc} */ + @Override + public long getCumulativeCpuTime() { + readProcStatFile(); + return cpuTimeTracker.getCumulativeCpuTime(); + } + + /** {@inheritDoc} */ + @Override + public float getCpuUsage() { + readProcStatFile(); + float overallCpuUsage = cpuTimeTracker.getCpuTrackerUsagePercent(); + if (overallCpuUsage != CpuTimeTracker.UNAVAILABLE) { + overallCpuUsage = overallCpuUsage / getNumProcessors(); + } + return overallCpuUsage; + } + + /** + * Test the {@link SysInfoLinux}. + * + * @param args - arguments to this calculator test + */ + public static void main(String[] args) { + SysInfoLinux plugin = new SysInfoLinux(); + System.out.println("Physical memory Size (bytes) : " + + plugin.getPhysicalMemorySize()); + System.out.println("Total Virtual memory Size (bytes) : " + + plugin.getVirtualMemorySize()); + System.out.println("Available Physical memory Size (bytes) : " + + plugin.getAvailablePhysicalMemorySize()); + System.out.println("Total Available Virtual memory Size (bytes) : " + + plugin.getAvailableVirtualMemorySize()); + System.out.println("Number of Processors : " + plugin.getNumProcessors()); + System.out.println("CPU frequency (kHz) : " + plugin.getCpuFrequency()); + System.out.println("Cumulative CPU time (ms) : " + + plugin.getCumulativeCpuTime()); + try { + // Sleep so we can compute the CPU usage + Thread.sleep(500L); + } catch (InterruptedException e) { + // do nothing + } + System.out.println("CPU usage % : " + plugin.getCpuUsage()); + } + + @VisibleForTesting + void setReadCpuInfoFile(boolean readCpuInfoFileValue) { + this.readCpuInfoFile = readCpuInfoFileValue; + } + + public long getJiffyLengthInMillis() { + return this.jiffyLengthInMillis; + } +} diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/SysInfoWindows.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/SysInfoWindows.java new file mode 100644 index 00000000000..da4c1c5e879 --- /dev/null +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/SysInfoWindows.java @@ -0,0 +1,181 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.util; + +import java.io.IOException; + +import com.google.common.annotations.VisibleForTesting; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.classification.InterfaceStability; +import org.apache.hadoop.util.Shell.ShellCommandExecutor; + +/** + * Plugin to calculate resource information on Windows systems. + */ +@InterfaceAudience.Private +@InterfaceStability.Evolving +public class SysInfoWindows extends SysInfo { + + private static final Log LOG = LogFactory.getLog(SysInfoWindows.class); + + private long vmemSize; + private long memSize; + private long vmemAvailable; + private long memAvailable; + private int numProcessors; + private long cpuFrequencyKhz; + private long cumulativeCpuTimeMs; + private float cpuUsage; + + private long lastRefreshTime; + static final int REFRESH_INTERVAL_MS = 1000; + + public SysInfoWindows() { + lastRefreshTime = 0; + reset(); + } + + @VisibleForTesting + long now() { + return System.nanoTime(); + } + + void reset() { + vmemSize = -1; + memSize = -1; + vmemAvailable = -1; + memAvailable = -1; + numProcessors = -1; + cpuFrequencyKhz = -1; + cumulativeCpuTimeMs = -1; + cpuUsage = -1; + } + + String getSystemInfoInfoFromShell() { + ShellCommandExecutor shellExecutor = new ShellCommandExecutor( + new String[] {Shell.WINUTILS, "systeminfo" }); + try { + shellExecutor.execute(); + return shellExecutor.getOutput(); + } catch (IOException e) { + LOG.error(StringUtils.stringifyException(e)); + } + return null; + } + + void refreshIfNeeded() { + long now = now(); + if (now - lastRefreshTime > REFRESH_INTERVAL_MS) { + long refreshInterval = now - lastRefreshTime; + lastRefreshTime = now; + long lastCumCpuTimeMs = cumulativeCpuTimeMs; + reset(); + String sysInfoStr = getSystemInfoInfoFromShell(); + if (sysInfoStr != null) { + final int sysInfoSplitCount = 7; + String[] sysInfo = sysInfoStr.substring(0, sysInfoStr.indexOf("\r\n")) + .split(","); + if (sysInfo.length == sysInfoSplitCount) { + try { + vmemSize = Long.parseLong(sysInfo[0]); + memSize = Long.parseLong(sysInfo[1]); + vmemAvailable = Long.parseLong(sysInfo[2]); + memAvailable = Long.parseLong(sysInfo[3]); + numProcessors = Integer.parseInt(sysInfo[4]); + cpuFrequencyKhz = Long.parseLong(sysInfo[5]); + cumulativeCpuTimeMs = Long.parseLong(sysInfo[6]); + if (lastCumCpuTimeMs != -1) { + cpuUsage = (cumulativeCpuTimeMs - lastCumCpuTimeMs) + / (refreshInterval * 1.0f); + } + } catch (NumberFormatException nfe) { + LOG.warn("Error parsing sysInfo", nfe); + } + } else { + LOG.warn("Expected split length of sysInfo to be " + + sysInfoSplitCount + ". Got " + sysInfo.length); + } + } + } + } + + /** {@inheritDoc} */ + @Override + public long getVirtualMemorySize() { + refreshIfNeeded(); + return vmemSize; + } + + /** {@inheritDoc} */ + @Override + public long getPhysicalMemorySize() { + refreshIfNeeded(); + return memSize; + } + + /** {@inheritDoc} */ + @Override + public long getAvailableVirtualMemorySize() { + refreshIfNeeded(); + return vmemAvailable; + } + + /** {@inheritDoc} */ + @Override + public long getAvailablePhysicalMemorySize() { + refreshIfNeeded(); + return memAvailable; + } + + /** {@inheritDoc} */ + @Override + public int getNumProcessors() { + refreshIfNeeded(); + return numProcessors; + } + + /** {@inheritDoc} */ + @Override + public int getNumCores() { + return getNumProcessors(); + } + + /** {@inheritDoc} */ + @Override + public long getCpuFrequency() { + refreshIfNeeded(); + return cpuFrequencyKhz; + } + + /** {@inheritDoc} */ + @Override + public long getCumulativeCpuTime() { + refreshIfNeeded(); + return cumulativeCpuTimeMs; + } + + /** {@inheritDoc} */ + @Override + public float getCpuUsage() { + refreshIfNeeded(); + return cpuUsage; + } +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/util/TestLinuxResourceCalculatorPlugin.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/util/TestSysInfoLinux.java similarity index 84% rename from hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/util/TestLinuxResourceCalculatorPlugin.java rename to hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/util/TestSysInfoLinux.java index a59d503976f..73edc774127 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/util/TestLinuxResourceCalculatorPlugin.java +++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/util/TestSysInfoLinux.java @@ -16,12 +16,11 @@ * limitations under the License. */ -package org.apache.hadoop.yarn.util; +package org.apache.hadoop.util; import java.io.File; import java.io.FileWriter; import java.io.IOException; -import java.util.ArrayList; import java.util.Random; import org.apache.commons.io.IOUtils; @@ -31,30 +30,30 @@ import org.junit.Test; import static org.junit.Assert.assertEquals; /** - * A JUnit test to test {@link LinuxResourceCalculatorPlugin} + * A JUnit test to test {@link SysInfoLinux} * Create the fake /proc/ information and verify the parsing and calculation */ -public class TestLinuxResourceCalculatorPlugin { +public class TestSysInfoLinux { /** * LinuxResourceCalculatorPlugin with a fake timer */ static class FakeLinuxResourceCalculatorPlugin extends - LinuxResourceCalculatorPlugin { - - long currentTime = 0; - public FakeLinuxResourceCalculatorPlugin(String procfsMemFile, - String procfsCpuFile, - String procfsStatFile, - long jiffyLengthInMillis) { - super(procfsMemFile, procfsCpuFile, procfsStatFile, jiffyLengthInMillis); - } - @Override - long getCurrentTime() { - return currentTime; - } - public void advanceTime(long adv) { - currentTime += adv * this.getJiffyLengthInMillis(); - } + SysInfoLinux { + + long currentTime = 0; + public FakeLinuxResourceCalculatorPlugin(String procfsMemFile, + String procfsCpuFile, + String procfsStatFile, + long jiffyLengthInMillis) { + super(procfsMemFile, procfsCpuFile, procfsStatFile, jiffyLengthInMillis); + } + @Override + long getCurrentTime() { + return currentTime; + } + public void advanceTime(long adv) { + currentTime += adv * this.getJiffyLengthInMillis(); + } } private static final FakeLinuxResourceCalculatorPlugin plugin; private static String TEST_ROOT_DIR = new Path(System.getProperty( @@ -72,36 +71,36 @@ public class TestLinuxResourceCalculatorPlugin { FAKE_STATFILE, FAKE_JIFFY_LENGTH); } - static final String MEMINFO_FORMAT = - "MemTotal: %d kB\n" + - "MemFree: %d kB\n" + - "Buffers: 138244 kB\n" + - "Cached: 947780 kB\n" + - "SwapCached: 142880 kB\n" + - "Active: 3229888 kB\n" + - "Inactive: %d kB\n" + - "SwapTotal: %d kB\n" + - "SwapFree: %d kB\n" + - "Dirty: 122012 kB\n" + - "Writeback: 0 kB\n" + - "AnonPages: 2710792 kB\n" + - "Mapped: 24740 kB\n" + - "Slab: 132528 kB\n" + - "SReclaimable: 105096 kB\n" + - "SUnreclaim: 27432 kB\n" + - "PageTables: 11448 kB\n" + - "NFS_Unstable: 0 kB\n" + - "Bounce: 0 kB\n" + - "CommitLimit: 4125904 kB\n" + - "Committed_AS: 4143556 kB\n" + - "VmallocTotal: 34359738367 kB\n" + - "VmallocUsed: 1632 kB\n" + - "VmallocChunk: 34359736375 kB\n" + - "HugePages_Total: 0\n" + - "HugePages_Free: 0\n" + - "HugePages_Rsvd: 0\n" + - "Hugepagesize: 2048 kB"; - + static final String MEMINFO_FORMAT = + "MemTotal: %d kB\n" + + "MemFree: %d kB\n" + + "Buffers: 138244 kB\n" + + "Cached: 947780 kB\n" + + "SwapCached: 142880 kB\n" + + "Active: 3229888 kB\n" + + "Inactive: %d kB\n" + + "SwapTotal: %d kB\n" + + "SwapFree: %d kB\n" + + "Dirty: 122012 kB\n" + + "Writeback: 0 kB\n" + + "AnonPages: 2710792 kB\n" + + "Mapped: 24740 kB\n" + + "Slab: 132528 kB\n" + + "SReclaimable: 105096 kB\n" + + "SUnreclaim: 27432 kB\n" + + "PageTables: 11448 kB\n" + + "NFS_Unstable: 0 kB\n" + + "Bounce: 0 kB\n" + + "CommitLimit: 4125904 kB\n" + + "Committed_AS: 4143556 kB\n" + + "VmallocTotal: 34359738367 kB\n" + + "VmallocUsed: 1632 kB\n" + + "VmallocChunk: 34359736375 kB\n" + + "HugePages_Total: 0\n" + + "HugePages_Free: 0\n" + + "HugePages_Rsvd: 0\n" + + "Hugepagesize: 2048 kB"; + static final String CPUINFO_FORMAT = "processor : %s\n" + "vendor_id : AuthenticAMD\n" + @@ -128,8 +127,8 @@ public class TestLinuxResourceCalculatorPlugin { "cache_alignment : 64\n" + "address sizes : 40 bits physical, 48 bits virtual\n" + "power management: ts fid vid ttp"; - - static final String STAT_FILE_FORMAT = + + static final String STAT_FILE_FORMAT = "cpu %d %d %d 1646495089 831319 48713 164346 0\n" + "cpu0 15096055 30805 3823005 411456015 206027 13 14269 0\n" + "cpu1 14760561 89890 6432036 408707910 456857 48074 130857 0\n" + @@ -141,7 +140,7 @@ public class TestLinuxResourceCalculatorPlugin { "processes 26414943\n" + "procs_running 1\n" + "procs_blocked 0\n"; - + /** * Test parsing /proc/stat and /proc/cpuinfo * @throws IOException @@ -164,7 +163,7 @@ public class TestLinuxResourceCalculatorPlugin { fWriter.close(); assertEquals(plugin.getNumProcessors(), numProcessors); assertEquals(plugin.getCpuFrequency(), cpuFrequencyKHz); - + // Write fake /proc/stat file. long uTime = 54972994; long nTime = 188860; @@ -183,13 +182,13 @@ public class TestLinuxResourceCalculatorPlugin { assertEquals(plugin.getCumulativeCpuTime(), FAKE_JIFFY_LENGTH * (uTime + nTime + sTime)); assertEquals(plugin.getCpuUsage(), 6.25F, 0.0); - + // Advance the time and sample again. This time, we call getCpuUsage() only. uTime += 600L; plugin.advanceTime(300L); updateStatFile(uTime, nTime, sTime); assertEquals(plugin.getCpuUsage(), 25F, 0.0); - + // Advance very short period of time (one jiffy length). // In this case, CPU usage should not be updated. uTime += 1L; @@ -199,7 +198,7 @@ public class TestLinuxResourceCalculatorPlugin { FAKE_JIFFY_LENGTH * (uTime + nTime + sTime)); assertEquals(plugin.getCpuUsage(), 25F, 0.0); // CPU usage is not updated. } - + /** * Write information to fake /proc/stat file */ @@ -209,7 +208,7 @@ public class TestLinuxResourceCalculatorPlugin { fWriter.write(String.format(STAT_FILE_FORMAT, uTime, nTime, sTime)); fWriter.close(); } - + /** * Test parsing /proc/meminfo * @throws IOException @@ -226,7 +225,7 @@ public class TestLinuxResourceCalculatorPlugin { FileWriter fWriter = new FileWriter(FAKE_MEMFILE); fWriter.write(String.format(MEMINFO_FORMAT, memTotal, memFree, inactive, swapTotal, swapFree)); - + fWriter.close(); assertEquals(plugin.getAvailablePhysicalMemorySize(), 1024L * (memFree + inactive)); diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/util/TestSysInfoWindows.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/util/TestSysInfoWindows.java new file mode 100644 index 00000000000..7924c02a312 --- /dev/null +++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/util/TestSysInfoWindows.java @@ -0,0 +1,100 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.util; + +import org.junit.Test; +import static org.junit.Assert.*; + +public class TestSysInfoWindows { + + + static class SysInfoWindowsMock extends SysInfoWindows { + private long time = SysInfoWindows.REFRESH_INTERVAL_MS + 1; + private String infoStr = null; + void setSysinfoString(String infoStr) { + this.infoStr = infoStr; + } + void advance(long dur) { + time += dur; + } + @Override + String getSystemInfoInfoFromShell() { + return infoStr; + } + @Override + long now() { + return time; + } + } + + @Test(timeout = 10000) + public void parseSystemInfoString() { + SysInfoWindowsMock tester = new SysInfoWindowsMock(); + tester.setSysinfoString( + "17177038848,8589467648,15232745472,6400417792,1,2805000,6261812\r\n"); + // info str derived from windows shell command has \r\n termination + assertEquals(17177038848L, tester.getVirtualMemorySize()); + assertEquals(8589467648L, tester.getPhysicalMemorySize()); + assertEquals(15232745472L, tester.getAvailableVirtualMemorySize()); + assertEquals(6400417792L, tester.getAvailablePhysicalMemorySize()); + assertEquals(1, tester.getNumProcessors()); + assertEquals(1, tester.getNumCores()); + assertEquals(2805000L, tester.getCpuFrequency()); + assertEquals(6261812L, tester.getCumulativeCpuTime()); + // undef on first call + assertEquals(-1.0, tester.getCpuUsage(), 0.0); + } + + @Test(timeout = 10000) + public void refreshAndCpuUsage() throws InterruptedException { + SysInfoWindowsMock tester = new SysInfoWindowsMock(); + tester.setSysinfoString( + "17177038848,8589467648,15232745472,6400417792,1,2805000,6261812\r\n"); + // info str derived from windows shell command has \r\n termination + tester.getAvailablePhysicalMemorySize(); + // verify information has been refreshed + assertEquals(6400417792L, tester.getAvailablePhysicalMemorySize()); + assertEquals(-1.0, tester.getCpuUsage(), 0.0); + + tester.setSysinfoString( + "17177038848,8589467648,15232745472,5400417792,1,2805000,6263012\r\n"); + tester.getAvailablePhysicalMemorySize(); + // verify information has not been refreshed + assertEquals(6400417792L, tester.getAvailablePhysicalMemorySize()); + assertEquals(-1.0, tester.getCpuUsage(), 0.0); + + // advance clock + tester.advance(SysInfoWindows.REFRESH_INTERVAL_MS + 1); + + // verify information has been refreshed + assertEquals(5400417792L, tester.getAvailablePhysicalMemorySize()); + assertEquals((6263012 - 6261812) / (SysInfoWindows.REFRESH_INTERVAL_MS + 1f), + tester.getCpuUsage(), 0.0); + } + + @Test(timeout = 10000) + public void errorInGetSystemInfo() { + SysInfoWindowsMock tester = new SysInfoWindowsMock(); + // info str derived from windows shell command has \r\n termination + tester.setSysinfoString(null); + // call a method to refresh values + tester.getAvailablePhysicalMemorySize(); + } + +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/util/LinuxResourceCalculatorPlugin.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/util/LinuxResourceCalculatorPlugin.java index bf4cfa4014c..f458f16ad47 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/util/LinuxResourceCalculatorPlugin.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/util/LinuxResourceCalculatorPlugin.java @@ -15,25 +15,11 @@ * See the License for the specific language governing permissions and * limitations under the License. */ - package org.apache.hadoop.yarn.util; -import java.io.BufferedReader; -import java.io.FileInputStream; -import java.io.FileNotFoundException; -import java.io.InputStreamReader; -import java.io.IOException; -import java.math.BigInteger; -import java.nio.charset.Charset; -import java.util.HashSet; -import java.util.regex.Matcher; -import java.util.regex.Pattern; - -import com.google.common.annotations.VisibleForTesting; -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceStability; +import org.apache.hadoop.util.SysInfoLinux; /** * Plugin to calculate resource information on Linux systems. @@ -41,383 +27,9 @@ import org.apache.hadoop.classification.InterfaceStability; @InterfaceAudience.Private @InterfaceStability.Unstable public class LinuxResourceCalculatorPlugin extends ResourceCalculatorPlugin { - private static final Log LOG = - LogFactory.getLog(LinuxResourceCalculatorPlugin.class); - - /** - * proc's meminfo virtual file has keys-values in the format - * "key:[ \t]*value[ \t]kB". - */ - private static final String PROCFS_MEMFILE = "/proc/meminfo"; - private static final Pattern PROCFS_MEMFILE_FORMAT = - Pattern.compile("^([a-zA-Z]*):[ \t]*([0-9]*)[ \t]kB"); - - // We need the values for the following keys in meminfo - private static final String MEMTOTAL_STRING = "MemTotal"; - private static final String SWAPTOTAL_STRING = "SwapTotal"; - private static final String MEMFREE_STRING = "MemFree"; - private static final String SWAPFREE_STRING = "SwapFree"; - private static final String INACTIVE_STRING = "Inactive"; - - /** - * Patterns for parsing /proc/cpuinfo. - */ - private static final String PROCFS_CPUINFO = "/proc/cpuinfo"; - private static final Pattern PROCESSOR_FORMAT = - Pattern.compile("^processor[ \t]:[ \t]*([0-9]*)"); - private static final Pattern FREQUENCY_FORMAT = - Pattern.compile("^cpu MHz[ \t]*:[ \t]*([0-9.]*)"); - private static final Pattern PHYSICAL_ID_FORMAT = - Pattern.compile("^physical id[ \t]*:[ \t]*([0-9]*)"); - private static final Pattern CORE_ID_FORMAT = - Pattern.compile("^core id[ \t]*:[ \t]*([0-9]*)"); - - /** - * Pattern for parsing /proc/stat. - */ - private static final String PROCFS_STAT = "/proc/stat"; - private static final Pattern CPU_TIME_FORMAT = - Pattern.compile("^cpu[ \t]*([0-9]*)" + - "[ \t]*([0-9]*)[ \t]*([0-9]*)[ \t].*"); - private CpuTimeTracker cpuTimeTracker; - - private String procfsMemFile; - private String procfsCpuFile; - private String procfsStatFile; - private long jiffyLengthInMillis; - - private long ramSize = 0; - private long swapSize = 0; - private long ramSizeFree = 0; // free ram space on the machine (kB) - private long swapSizeFree = 0; // free swap space on the machine (kB) - private long inactiveSize = 0; // inactive cache memory (kB) - /* number of logical processors on the system. */ - private int numProcessors = 0; - /* number of physical cores on the system. */ - private int numCores = 0; - private long cpuFrequency = 0L; // CPU frequency on the system (kHz) - - private boolean readMemInfoFile = false; - private boolean readCpuInfoFile = false; - - /** - * Get current time. - * @return Unix time stamp in millisecond - */ - long getCurrentTime() { - return System.currentTimeMillis(); - } public LinuxResourceCalculatorPlugin() { - this(PROCFS_MEMFILE, PROCFS_CPUINFO, PROCFS_STAT, - ProcfsBasedProcessTree.JIFFY_LENGTH_IN_MILLIS); + super(new SysInfoLinux()); } - /** - * Constructor which allows assigning the /proc/ directories. This will be - * used only in unit tests. - * @param procfsMemFile fake file for /proc/meminfo - * @param procfsCpuFile fake file for /proc/cpuinfo - * @param procfsStatFile fake file for /proc/stat - * @param jiffyLengthInMillis fake jiffy length value - */ - public LinuxResourceCalculatorPlugin(String procfsMemFile, - String procfsCpuFile, - String procfsStatFile, - long jiffyLengthInMillis) { - this.procfsMemFile = procfsMemFile; - this.procfsCpuFile = procfsCpuFile; - this.procfsStatFile = procfsStatFile; - this.jiffyLengthInMillis = jiffyLengthInMillis; - this.cpuTimeTracker = new CpuTimeTracker(jiffyLengthInMillis); - } - - /** - * Read /proc/meminfo, parse and compute memory information only once. - */ - private void readProcMemInfoFile() { - readProcMemInfoFile(false); - } - - /** - * Read /proc/meminfo, parse and compute memory information. - * @param readAgain if false, read only on the first time - */ - private void readProcMemInfoFile(boolean readAgain) { - - if (readMemInfoFile && !readAgain) { - return; - } - - // Read "/proc/memInfo" file - BufferedReader in; - InputStreamReader fReader; - try { - fReader = new InputStreamReader( - new FileInputStream(procfsMemFile), Charset.forName("UTF-8")); - in = new BufferedReader(fReader); - } catch (FileNotFoundException f) { - // shouldn't happen.... - LOG.warn("Couldn't read " + procfsMemFile - + "; can't determine memory settings"); - return; - } - - Matcher mat; - - try { - String str = in.readLine(); - while (str != null) { - mat = PROCFS_MEMFILE_FORMAT.matcher(str); - if (mat.find()) { - if (mat.group(1).equals(MEMTOTAL_STRING)) { - ramSize = Long.parseLong(mat.group(2)); - } else if (mat.group(1).equals(SWAPTOTAL_STRING)) { - swapSize = Long.parseLong(mat.group(2)); - } else if (mat.group(1).equals(MEMFREE_STRING)) { - ramSizeFree = Long.parseLong(mat.group(2)); - } else if (mat.group(1).equals(SWAPFREE_STRING)) { - swapSizeFree = Long.parseLong(mat.group(2)); - } else if (mat.group(1).equals(INACTIVE_STRING)) { - inactiveSize = Long.parseLong(mat.group(2)); - } - } - str = in.readLine(); - } - } catch (IOException io) { - LOG.warn("Error reading the stream " + io); - } finally { - // Close the streams - try { - fReader.close(); - try { - in.close(); - } catch (IOException i) { - LOG.warn("Error closing the stream " + in); - } - } catch (IOException i) { - LOG.warn("Error closing the stream " + fReader); - } - } - - readMemInfoFile = true; - } - - /** - * Read /proc/cpuinfo, parse and calculate CPU information. - */ - private void readProcCpuInfoFile() { - // This directory needs to be read only once - if (readCpuInfoFile) { - return; - } - HashSet coreIdSet = new HashSet<>(); - // Read "/proc/cpuinfo" file - BufferedReader in; - InputStreamReader fReader; - try { - fReader = new InputStreamReader( - new FileInputStream(procfsCpuFile), Charset.forName("UTF-8")); - in = new BufferedReader(fReader); - } catch (FileNotFoundException f) { - // shouldn't happen.... - LOG.warn("Couldn't read " + procfsCpuFile + "; can't determine cpu info"); - return; - } - Matcher mat; - try { - numProcessors = 0; - numCores = 1; - String currentPhysicalId = ""; - String str = in.readLine(); - while (str != null) { - mat = PROCESSOR_FORMAT.matcher(str); - if (mat.find()) { - numProcessors++; - } - mat = FREQUENCY_FORMAT.matcher(str); - if (mat.find()) { - cpuFrequency = (long)(Double.parseDouble(mat.group(1)) * 1000); // kHz - } - mat = PHYSICAL_ID_FORMAT.matcher(str); - if (mat.find()) { - currentPhysicalId = str; - } - mat = CORE_ID_FORMAT.matcher(str); - if (mat.find()) { - coreIdSet.add(currentPhysicalId + " " + str); - numCores = coreIdSet.size(); - } - str = in.readLine(); - } - } catch (IOException io) { - LOG.warn("Error reading the stream " + io); - } finally { - // Close the streams - try { - fReader.close(); - try { - in.close(); - } catch (IOException i) { - LOG.warn("Error closing the stream " + in); - } - } catch (IOException i) { - LOG.warn("Error closing the stream " + fReader); - } - } - readCpuInfoFile = true; - } - - /** - * Read /proc/stat file, parse and calculate cumulative CPU. - */ - private void readProcStatFile() { - // Read "/proc/stat" file - BufferedReader in; - InputStreamReader fReader; - try { - fReader = new InputStreamReader( - new FileInputStream(procfsStatFile), Charset.forName("UTF-8")); - in = new BufferedReader(fReader); - } catch (FileNotFoundException f) { - // shouldn't happen.... - return; - } - - Matcher mat; - try { - String str = in.readLine(); - while (str != null) { - mat = CPU_TIME_FORMAT.matcher(str); - if (mat.find()) { - long uTime = Long.parseLong(mat.group(1)); - long nTime = Long.parseLong(mat.group(2)); - long sTime = Long.parseLong(mat.group(3)); - cpuTimeTracker.updateElapsedJiffies( - BigInteger.valueOf(uTime + nTime + sTime), - getCurrentTime()); - break; - } - str = in.readLine(); - } - } catch (IOException io) { - LOG.warn("Error reading the stream " + io); - } finally { - // Close the streams - try { - fReader.close(); - try { - in.close(); - } catch (IOException i) { - LOG.warn("Error closing the stream " + in); - } - } catch (IOException i) { - LOG.warn("Error closing the stream " + fReader); - } - } - } - - /** {@inheritDoc} */ - @Override - public long getPhysicalMemorySize() { - readProcMemInfoFile(); - return ramSize * 1024; - } - - /** {@inheritDoc} */ - @Override - public long getVirtualMemorySize() { - readProcMemInfoFile(); - return (ramSize + swapSize) * 1024; - } - - /** {@inheritDoc} */ - @Override - public long getAvailablePhysicalMemorySize() { - readProcMemInfoFile(true); - return (ramSizeFree + inactiveSize) * 1024; - } - - /** {@inheritDoc} */ - @Override - public long getAvailableVirtualMemorySize() { - readProcMemInfoFile(true); - return (ramSizeFree + swapSizeFree + inactiveSize) * 1024; - } - - /** {@inheritDoc} */ - @Override - public int getNumProcessors() { - readProcCpuInfoFile(); - return numProcessors; - } - - /** {@inheritDoc} */ - @Override - public int getNumCores() { - readProcCpuInfoFile(); - return numCores; - } - - /** {@inheritDoc} */ - @Override - public long getCpuFrequency() { - readProcCpuInfoFile(); - return cpuFrequency; - } - - /** {@inheritDoc} */ - @Override - public long getCumulativeCpuTime() { - readProcStatFile(); - return cpuTimeTracker.cumulativeCpuTime.longValue(); - } - - /** {@inheritDoc} */ - @Override - public float getCpuUsage() { - readProcStatFile(); - float overallCpuUsage = cpuTimeTracker.getCpuTrackerUsagePercent(); - if (overallCpuUsage != CpuTimeTracker.UNAVAILABLE) { - overallCpuUsage = overallCpuUsage / getNumProcessors(); - } - return overallCpuUsage; - } - - /** - * Test the {@link LinuxResourceCalculatorPlugin}. - * - * @param args - arguments to this calculator test - */ - public static void main(String[] args) { - LinuxResourceCalculatorPlugin plugin = new LinuxResourceCalculatorPlugin(); - System.out.println("Physical memory Size (bytes) : " - + plugin.getPhysicalMemorySize()); - System.out.println("Total Virtual memory Size (bytes) : " - + plugin.getVirtualMemorySize()); - System.out.println("Available Physical memory Size (bytes) : " - + plugin.getAvailablePhysicalMemorySize()); - System.out.println("Total Available Virtual memory Size (bytes) : " - + plugin.getAvailableVirtualMemorySize()); - System.out.println("Number of Processors : " + plugin.getNumProcessors()); - System.out.println("CPU frequency (kHz) : " + plugin.getCpuFrequency()); - System.out.println("Cumulative CPU time (ms) : " + - plugin.getCumulativeCpuTime()); - try { - // Sleep so we can compute the CPU usage - Thread.sleep(500L); - } catch (InterruptedException e) { - // do nothing - } - System.out.println("CPU usage % : " + plugin.getCpuUsage()); - } - - @VisibleForTesting - void setReadCpuInfoFile(boolean readCpuInfoFileValue) { - this.readCpuInfoFile = readCpuInfoFileValue; - } - - public long getJiffyLengthInMillis() { - return this.jiffyLengthInMillis; - } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/util/ProcfsBasedProcessTree.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/util/ProcfsBasedProcessTree.java index df9d28a61ee..2345c6273ad 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/util/ProcfsBasedProcessTree.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/util/ProcfsBasedProcessTree.java @@ -40,9 +40,9 @@ import org.apache.commons.logging.LogFactory; import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceStability; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.util.CpuTimeTracker; import org.apache.hadoop.util.Shell; -import org.apache.hadoop.util.Shell.ShellCommandExecutor; -import org.apache.hadoop.util.StringUtils; +import org.apache.hadoop.util.SysInfoLinux; import org.apache.hadoop.yarn.conf.YarnConfiguration; /** @@ -64,8 +64,9 @@ public class ProcfsBasedProcessTree extends ResourceCalculatorProcessTree { public static final String PROCFS_STAT_FILE = "stat"; public static final String PROCFS_CMDLINE_FILE = "cmdline"; - public static final long PAGE_SIZE; - public static final long JIFFY_LENGTH_IN_MILLIS; // in millisecond + public static final long PAGE_SIZE = SysInfoLinux.PAGE_SIZE; + public static final long JIFFY_LENGTH_IN_MILLIS = + SysInfoLinux.JIFFY_LENGTH_IN_MILLIS; // in millisecond private final CpuTimeTracker cpuTimeTracker; private Clock clock; @@ -108,31 +109,6 @@ public class ProcfsBasedProcessTree extends ResourceCalculatorProcessTree { protected Map processSMAPTree = new HashMap(); - static { - long jiffiesPerSecond = -1; - long pageSize = -1; - try { - if(Shell.LINUX) { - ShellCommandExecutor shellExecutorClk = new ShellCommandExecutor( - new String[] { "getconf", "CLK_TCK" }); - shellExecutorClk.execute(); - jiffiesPerSecond = Long.parseLong(shellExecutorClk.getOutput().replace("\n", "")); - - ShellCommandExecutor shellExecutorPage = new ShellCommandExecutor( - new String[] { "getconf", "PAGESIZE" }); - shellExecutorPage.execute(); - pageSize = Long.parseLong(shellExecutorPage.getOutput().replace("\n", "")); - - } - } catch (IOException e) { - LOG.error(StringUtils.stringifyException(e)); - } finally { - JIFFY_LENGTH_IN_MILLIS = jiffiesPerSecond != -1 ? - Math.round(1000D / jiffiesPerSecond) : -1; - PAGE_SIZE = pageSize; - } - } - // to enable testing, using this variable which can be configured // to a test directory. private String procfsDir; diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/util/ResourceCalculatorPlugin.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/util/ResourceCalculatorPlugin.java index 40bd44ee250..5e5f1b49032 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/util/ResourceCalculatorPlugin.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/util/ResourceCalculatorPlugin.java @@ -15,7 +15,6 @@ * See the License for the specific language governing permissions and * limitations under the License. */ - package org.apache.hadoop.yarn.util; import org.apache.hadoop.classification.InterfaceAudience; @@ -23,29 +22,42 @@ import org.apache.hadoop.classification.InterfaceStability; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configured; import org.apache.hadoop.util.ReflectionUtils; -import org.apache.hadoop.util.Shell; +import org.apache.hadoop.util.SysInfo; /** * Plugin to calculate resource information on the system. - * */ @InterfaceAudience.LimitedPrivate({"YARN", "MAPREDUCE"}) @InterfaceStability.Unstable -public abstract class ResourceCalculatorPlugin extends Configured { +public class ResourceCalculatorPlugin extends Configured { + + private final SysInfo sys; + + protected ResourceCalculatorPlugin() { + this(SysInfo.newInstance()); + } + + public ResourceCalculatorPlugin(SysInfo sys) { + this.sys = sys; + } /** * Obtain the total size of the virtual memory present in the system. * * @return virtual memory size in bytes. */ - public abstract long getVirtualMemorySize(); + public long getVirtualMemorySize() { + return sys.getVirtualMemorySize(); + } /** * Obtain the total size of the physical memory present in the system. * * @return physical memory size bytes. */ - public abstract long getPhysicalMemorySize(); + public long getPhysicalMemorySize() { + return sys.getPhysicalMemorySize(); + } /** * Obtain the total size of the available virtual memory present @@ -53,7 +65,9 @@ public abstract class ResourceCalculatorPlugin extends Configured { * * @return available virtual memory size in bytes. */ - public abstract long getAvailableVirtualMemorySize(); + public long getAvailableVirtualMemorySize() { + return sys.getAvailableVirtualMemorySize(); + } /** * Obtain the total size of the available physical memory present @@ -61,42 +75,54 @@ public abstract class ResourceCalculatorPlugin extends Configured { * * @return available physical memory size bytes. */ - public abstract long getAvailablePhysicalMemorySize(); + public long getAvailablePhysicalMemorySize() { + return sys.getAvailablePhysicalMemorySize(); + } /** * Obtain the total number of logical processors present on the system. * * @return number of logical processors */ - public abstract int getNumProcessors(); + public int getNumProcessors() { + return sys.getNumProcessors(); + } /** * Obtain total number of physical cores present on the system. * * @return number of physical cores */ - public abstract int getNumCores(); + public int getNumCores() { + return sys.getNumCores(); + } /** * Obtain the CPU frequency of on the system. * * @return CPU frequency in kHz */ - public abstract long getCpuFrequency(); + public long getCpuFrequency() { + return sys.getCpuFrequency(); + } /** * Obtain the cumulative CPU time since the system is on. * * @return cumulative CPU time in milliseconds */ - public abstract long getCumulativeCpuTime(); + public long getCumulativeCpuTime() { + return sys.getCumulativeCpuTime(); + } /** * Obtain the CPU usage % of the machine. Return -1 if it is unavailable * * @return CPU usage in % */ - public abstract float getCpuUsage(); + public float getCpuUsage() { + return sys.getCpuUsage(); + } /** * Create the ResourceCalculatorPlugin from the class name and configure it. If @@ -114,21 +140,11 @@ public abstract class ResourceCalculatorPlugin extends Configured { if (clazz != null) { return ReflectionUtils.newInstance(clazz, conf); } - - // No class given, try a os specific class try { - if (Shell.LINUX) { - return new LinuxResourceCalculatorPlugin(); - } - if (Shell.WINDOWS) { - return new WindowsResourceCalculatorPlugin(); - } - } catch (SecurityException se) { - // Failed to get Operating System name. + return new ResourceCalculatorPlugin(); + } catch (SecurityException e) { return null; } - - // Not supported on this system. - return null; } + } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/util/WindowsBasedProcessTree.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/util/WindowsBasedProcessTree.java index 7d9c7d32492..ebe8df12547 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/util/WindowsBasedProcessTree.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/util/WindowsBasedProcessTree.java @@ -229,7 +229,7 @@ public class WindowsBasedProcessTree extends ResourceCalculatorProcessTree { @Override public float getCpuUsagePercent() { - return CpuTimeTracker.UNAVAILABLE; + return UNAVAILABLE; } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/util/WindowsResourceCalculatorPlugin.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/util/WindowsResourceCalculatorPlugin.java index cdbf5254ec8..f817b7a9d9f 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/util/WindowsResourceCalculatorPlugin.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/util/WindowsResourceCalculatorPlugin.java @@ -15,162 +15,18 @@ * See the License for the specific language governing permissions and * limitations under the License. */ - package org.apache.hadoop.yarn.util; -import java.io.IOException; +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.classification.InterfaceStability; +import org.apache.hadoop.util.SysInfoWindows; -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; -import org.apache.hadoop.classification.InterfaceAudience.Private; -import org.apache.hadoop.util.Shell; -import org.apache.hadoop.util.Shell.ShellCommandExecutor; -import org.apache.hadoop.util.StringUtils; - -@Private +@InterfaceAudience.Private +@InterfaceStability.Unstable public class WindowsResourceCalculatorPlugin extends ResourceCalculatorPlugin { - - static final Log LOG = LogFactory - .getLog(WindowsResourceCalculatorPlugin.class); - - long vmemSize; - long memSize; - long vmemAvailable; - long memAvailable; - int numProcessors; - long cpuFrequencyKhz; - long cumulativeCpuTimeMs; - float cpuUsage; - - long lastRefreshTime; - private final int refreshIntervalMs = 1000; - - WindowsBasedProcessTree pTree = null; - + public WindowsResourceCalculatorPlugin() { - lastRefreshTime = 0; - reset(); - } - - void reset() { - vmemSize = -1; - memSize = -1; - vmemAvailable = -1; - memAvailable = -1; - numProcessors = -1; - cpuFrequencyKhz = -1; - cumulativeCpuTimeMs = -1; - cpuUsage = -1; + super(new SysInfoWindows()); } - String getSystemInfoInfoFromShell() { - ShellCommandExecutor shellExecutor = new ShellCommandExecutor( - new String[] { Shell.WINUTILS, "systeminfo" }); - try { - shellExecutor.execute(); - return shellExecutor.getOutput(); - } catch (IOException e) { - LOG.error(StringUtils.stringifyException(e)); - } - return null; - } - - void refreshIfNeeded() { - long now = System.currentTimeMillis(); - if (now - lastRefreshTime > refreshIntervalMs) { - long refreshInterval = now - lastRefreshTime; - lastRefreshTime = now; - long lastCumCpuTimeMs = cumulativeCpuTimeMs; - reset(); - String sysInfoStr = getSystemInfoInfoFromShell(); - if (sysInfoStr != null) { - final int sysInfoSplitCount = 7; - String[] sysInfo = sysInfoStr.substring(0, sysInfoStr.indexOf("\r\n")) - .split(","); - if (sysInfo.length == sysInfoSplitCount) { - try { - vmemSize = Long.parseLong(sysInfo[0]); - memSize = Long.parseLong(sysInfo[1]); - vmemAvailable = Long.parseLong(sysInfo[2]); - memAvailable = Long.parseLong(sysInfo[3]); - numProcessors = Integer.parseInt(sysInfo[4]); - cpuFrequencyKhz = Long.parseLong(sysInfo[5]); - cumulativeCpuTimeMs = Long.parseLong(sysInfo[6]); - if (lastCumCpuTimeMs != -1) { - cpuUsage = (cumulativeCpuTimeMs - lastCumCpuTimeMs) - / (refreshInterval * 1.0f); - } - - } catch (NumberFormatException nfe) { - LOG.warn("Error parsing sysInfo." + nfe); - } - } else { - LOG.warn("Expected split length of sysInfo to be " - + sysInfoSplitCount + ". Got " + sysInfo.length); - } - } - } - } - - /** {@inheritDoc} */ - @Override - public long getVirtualMemorySize() { - refreshIfNeeded(); - return vmemSize; - } - - /** {@inheritDoc} */ - @Override - public long getPhysicalMemorySize() { - refreshIfNeeded(); - return memSize; - } - - /** {@inheritDoc} */ - @Override - public long getAvailableVirtualMemorySize() { - refreshIfNeeded(); - return vmemAvailable; - } - - /** {@inheritDoc} */ - @Override - public long getAvailablePhysicalMemorySize() { - refreshIfNeeded(); - return memAvailable; - } - - /** {@inheritDoc} */ - @Override - public int getNumProcessors() { - refreshIfNeeded(); - return numProcessors; - } - - /** {@inheritDoc} */ - @Override - public int getNumCores() { - return getNumProcessors(); - } - - /** {@inheritDoc} */ - @Override - public long getCpuFrequency() { - refreshIfNeeded(); - return cpuFrequencyKhz; - } - - /** {@inheritDoc} */ - @Override - public long getCumulativeCpuTime() { - refreshIfNeeded(); - return cumulativeCpuTimeMs; - } - - /** {@inheritDoc} */ - @Override - public float getCpuUsage() { - refreshIfNeeded(); - return cpuUsage; - } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/util/TestResourceCalculatorProcessTree.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/util/TestResourceCalculatorProcessTree.java index 777ea9f596e..7a3e0e78666 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/util/TestResourceCalculatorProcessTree.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/util/TestResourceCalculatorProcessTree.java @@ -65,7 +65,7 @@ public class TestResourceCalculatorProcessTree { @Override public float getCpuUsagePercent() { - return CpuTimeTracker.UNAVAILABLE; + return UNAVAILABLE; } public boolean checkPidPgrpidForMatch() { diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/util/TestWindowsResourceCalculatorPlugin.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/util/TestWindowsResourceCalculatorPlugin.java deleted file mode 100644 index a9e20bc0644..00000000000 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/util/TestWindowsResourceCalculatorPlugin.java +++ /dev/null @@ -1,86 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.hadoop.yarn.util; - -import org.junit.Test; -import static org.junit.Assert.assertTrue; - -public class TestWindowsResourceCalculatorPlugin { - - - class WindowsResourceCalculatorPluginTester extends WindowsResourceCalculatorPlugin { - private String infoStr = null; - @Override - String getSystemInfoInfoFromShell() { - return infoStr; - } - } - - @Test (timeout = 30000) - public void parseSystemInfoString() { - WindowsResourceCalculatorPluginTester tester = new WindowsResourceCalculatorPluginTester(); - // info str derived from windows shell command has \r\n termination - tester.infoStr = "17177038848,8589467648,15232745472,6400417792,1,2805000,6261812\r\n"; - // call a method to refresh values - tester.getAvailablePhysicalMemorySize(); - // verify information has been refreshed - assertTrue(tester.vmemSize == 17177038848L); - assertTrue(tester.memSize == 8589467648L); - assertTrue(tester.vmemAvailable == 15232745472L); - assertTrue(tester.memAvailable == 6400417792L); - assertTrue(tester.numProcessors == 1); - assertTrue(tester.cpuFrequencyKhz == 2805000L); - assertTrue(tester.cumulativeCpuTimeMs == 6261812L); - assertTrue(tester.cpuUsage == -1); - } - - @Test (timeout = 20000) - public void refreshAndCpuUsage() throws InterruptedException { - WindowsResourceCalculatorPluginTester tester = new WindowsResourceCalculatorPluginTester(); - // info str derived from windows shell command has \r\n termination - tester.infoStr = "17177038848,8589467648,15232745472,6400417792,1,2805000,6261812\r\n"; - tester.getAvailablePhysicalMemorySize(); - // verify information has been refreshed - assertTrue(tester.memAvailable == 6400417792L); - assertTrue(tester.cpuUsage == -1); - - tester.infoStr = "17177038848,8589467648,15232745472,5400417792,1,2805000,6261812\r\n"; - tester.getAvailablePhysicalMemorySize(); - // verify information has not been refreshed - assertTrue(tester.memAvailable == 6400417792L); - assertTrue(tester.cpuUsage == -1); - - Thread.sleep(1500); - tester.infoStr = "17177038848,8589467648,15232745472,5400417792,1,2805000,6286812\r\n"; - tester.getAvailablePhysicalMemorySize(); - // verify information has been refreshed - assertTrue(tester.memAvailable == 5400417792L); - assertTrue(tester.cpuUsage >= 0.1); - } - - @Test (timeout = 20000) - public void errorInGetSystemInfo() { - WindowsResourceCalculatorPluginTester tester = new WindowsResourceCalculatorPluginTester(); - // info str derived from windows shell command has \r\n termination - tester.infoStr = null; - // call a method to refresh values - tester.getAvailablePhysicalMemorySize(); - } - -}