HADOOP-12211. Collect disk usage on the node. Contributed by Robert Grandl

This commit is contained in:
Chris Douglas 2015-07-13 15:36:11 -07:00
parent 9ef03a4c5b
commit a431ed9075
7 changed files with 293 additions and 4 deletions

View File

@ -695,6 +695,8 @@ Release 2.8.0 - UNRELEASED
HADOOP-12210. Collect network usage on the node (Robert Grandl via cdouglas)
HADOOP-12211. Collect disk usage on the node (Robert Grandl via cdouglas)
OPTIMIZATIONS
HADOOP-11785. Reduce the number of listStatus operation in distcp

View File

@ -120,4 +120,18 @@ public abstract class SysInfo {
*/
public abstract long getNetworkBytesWritten();
/**
* Obtain the aggregated number of bytes read from disks.
*
* @return total number of bytes read.
*/
public abstract long getStorageBytesRead();
/**
* Obtain the aggregated number of bytes written to disks.
*
* @return total number of bytes written.
*/
public abstract long getStorageBytesWritten();
}

View File

@ -25,6 +25,7 @@ import java.io.InputStreamReader;
import java.io.IOException;
import java.math.BigInteger;
import java.nio.charset.Charset;
import java.util.HashMap;
import java.util.HashSet;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
@ -94,11 +95,27 @@ public class SysInfoLinux extends SysInfo {
"[ \t]*([0-9]+)[ \t]*([0-9]+)[ \t]*([0-9]+)[ \t]*([0-9]+)" +
"[ \t]*([0-9]+)[ \t]*([0-9]+)[ \t]*([0-9]+)[ \t]*([0-9]+).*");
/**
* Pattern for parsing /proc/diskstats.
*/
private static final String PROCFS_DISKSFILE = "/proc/diskstats";
private static final Pattern PROCFS_DISKSFILE_FORMAT =
Pattern.compile("^[ \t]*([0-9]+)[ \t]*([0-9 ]+)" +
"(?!([a-zA-Z]+[0-9]+))([a-zA-Z]+)" +
"[ \t]*([0-9]+)[ \t]*([0-9]+)[ \t]*([0-9]+)[ \t]*([0-9]+)" +
"[ \t]*([0-9]+)[ \t]*([0-9]+)[ \t]*([0-9]+)[ \t]*([0-9]+)" +
"[ \t]*([0-9]+)[ \t]*([0-9]+)[ \t]*([0-9]+)");
/**
* Pattern for parsing /sys/block/partition_name/queue/hw_sector_size.
*/
private static final Pattern PROCFS_DISKSECTORFILE_FORMAT =
Pattern.compile("^([0-9]+)");
private String procfsMemFile;
private String procfsCpuFile;
private String procfsStatFile;
private String procfsNetFile;
private String procfsDisksFile;
private long jiffyLengthInMillis;
private long ramSize = 0;
@ -113,10 +130,15 @@ public class SysInfoLinux extends SysInfo {
private long cpuFrequency = 0L; // CPU frequency on the system (kHz)
private long numNetBytesRead = 0L; // aggregated bytes read from network
private long numNetBytesWritten = 0L; // aggregated bytes written to network
private long numDisksBytesRead = 0L; // aggregated bytes read from disks
private long numDisksBytesWritten = 0L; // aggregated bytes written to disks
private boolean readMemInfoFile = false;
private boolean readCpuInfoFile = false;
/* map for every disk its sector size */
private HashMap<String, Integer> perDiskSectorSize = null;
public static final long PAGE_SIZE = getConf("PAGESIZE");
public static final long JIFFY_LENGTH_IN_MILLIS =
Math.max(Math.round(1000D / getConf("CLK_TCK")), -1);
@ -145,7 +167,7 @@ public class SysInfoLinux extends SysInfo {
public SysInfoLinux() {
this(PROCFS_MEMFILE, PROCFS_CPUINFO, PROCFS_STAT,
PROCFS_NETFILE, JIFFY_LENGTH_IN_MILLIS);
PROCFS_NETFILE, PROCFS_DISKSFILE, JIFFY_LENGTH_IN_MILLIS);
}
/**
@ -155,6 +177,7 @@ public class SysInfoLinux extends SysInfo {
* @param procfsCpuFile fake file for /proc/cpuinfo
* @param procfsStatFile fake file for /proc/stat
* @param procfsNetFile fake file for /proc/net/dev
* @param procfsDisksFile fake file for /proc/diskstats
* @param jiffyLengthInMillis fake jiffy length value
*/
@VisibleForTesting
@ -162,13 +185,16 @@ public class SysInfoLinux extends SysInfo {
String procfsCpuFile,
String procfsStatFile,
String procfsNetFile,
String procfsDisksFile,
long jiffyLengthInMillis) {
this.procfsMemFile = procfsMemFile;
this.procfsCpuFile = procfsCpuFile;
this.procfsStatFile = procfsStatFile;
this.procfsNetFile = procfsNetFile;
this.procfsDisksFile = procfsDisksFile;
this.jiffyLengthInMillis = jiffyLengthInMillis;
this.cpuTimeTracker = new CpuTimeTracker(jiffyLengthInMillis);
this.perDiskSectorSize = new HashMap<String, Integer>();
}
/**
@ -411,6 +437,119 @@ public class SysInfoLinux extends SysInfo {
}
}
/**
* Read /proc/diskstats file, parse and calculate amount
* of bytes read and written from/to disks.
*/
private void readProcDisksInfoFile() {
numDisksBytesRead = 0L;
numDisksBytesWritten = 0L;
// Read "/proc/diskstats" file
BufferedReader in;
try {
in = new BufferedReader(new InputStreamReader(
new FileInputStream(procfsDisksFile), Charset.forName("UTF-8")));
} catch (FileNotFoundException f) {
return;
}
Matcher mat;
try {
String str = in.readLine();
while (str != null) {
mat = PROCFS_DISKSFILE_FORMAT.matcher(str);
if (mat.find()) {
String diskName = mat.group(4);
assert diskName != null;
// ignore loop or ram partitions
if (diskName.contains("loop") || diskName.contains("ram")) {
str = in.readLine();
continue;
}
Integer sectorSize;
synchronized (perDiskSectorSize) {
sectorSize = perDiskSectorSize.get(diskName);
if (null == sectorSize) {
// retrieve sectorSize
// if unavailable or error, assume 512
sectorSize = readDiskBlockInformation(diskName, 512);
perDiskSectorSize.put(diskName, sectorSize);
}
}
String sectorsRead = mat.group(7);
String sectorsWritten = mat.group(11);
if (null == sectorsRead || null == sectorsWritten) {
return;
}
numDisksBytesRead += Long.parseLong(sectorsRead) * sectorSize;
numDisksBytesWritten += Long.parseLong(sectorsWritten) * sectorSize;
}
str = in.readLine();
}
} catch (IOException e) {
LOG.warn("Error reading the stream " + procfsDisksFile, e);
} finally {
// Close the streams
try {
in.close();
} catch (IOException e) {
LOG.warn("Error closing the stream " + procfsDisksFile, e);
}
}
}
/**
* Read /sys/block/diskName/queue/hw_sector_size file, parse and calculate
* sector size for a specific disk.
* @return sector size of specified disk, or defSector
*/
int readDiskBlockInformation(String diskName, int defSector) {
assert perDiskSectorSize != null && diskName != null;
String procfsDiskSectorFile =
"/sys/block/" + diskName + "/queue/hw_sector_size";
BufferedReader in;
try {
in = new BufferedReader(new InputStreamReader(
new FileInputStream(procfsDiskSectorFile),
Charset.forName("UTF-8")));
} catch (FileNotFoundException f) {
return defSector;
}
Matcher mat;
try {
String str = in.readLine();
while (str != null) {
mat = PROCFS_DISKSECTORFILE_FORMAT.matcher(str);
if (mat.find()) {
String secSize = mat.group(1);
if (secSize != null) {
return Integer.parseInt(secSize);
}
}
str = in.readLine();
}
return defSector;
} catch (IOException|NumberFormatException e) {
LOG.warn("Error reading the stream " + procfsDiskSectorFile, e);
return defSector;
} finally {
// Close the streams
try {
in.close();
} catch (IOException e) {
LOG.warn("Error closing the stream " + procfsDiskSectorFile, e);
}
}
}
/** {@inheritDoc} */
@Override
public long getPhysicalMemorySize() {
@ -492,6 +631,18 @@ public class SysInfoLinux extends SysInfo {
return numNetBytesWritten;
}
@Override
public long getStorageBytesRead() {
readProcDisksInfoFile();
return numDisksBytesRead;
}
@Override
public long getStorageBytesWritten() {
readProcDisksInfoFile();
return numDisksBytesWritten;
}
/**
* Test the {@link SysInfoLinux}.
*
@ -515,6 +666,10 @@ public class SysInfoLinux extends SysInfo {
+ plugin.getNetworkBytesRead());
System.out.println("Total network written (bytes) : "
+ plugin.getNetworkBytesWritten());
System.out.println("Total storage read (bytes) : "
+ plugin.getStorageBytesRead());
System.out.println("Total storage written (bytes) : "
+ plugin.getStorageBytesWritten());
try {
// Sleep so we can compute the CPU usage
Thread.sleep(500L);

View File

@ -193,4 +193,16 @@ public class SysInfoWindows extends SysInfo {
return 0L;
}
@Override
public long getStorageBytesRead() {
// TODO unimplemented
return 0L;
}
@Override
public long getStorageBytesWritten() {
// TODO unimplemented
return 0L;
}
}

View File

@ -37,17 +37,18 @@ public class TestSysInfoLinux {
/**
* LinuxResourceCalculatorPlugin with a fake timer
*/
static class FakeLinuxResourceCalculatorPlugin extends
SysInfoLinux {
static class FakeLinuxResourceCalculatorPlugin extends SysInfoLinux {
static final int SECTORSIZE = 4096;
long currentTime = 0;
public FakeLinuxResourceCalculatorPlugin(String procfsMemFile,
String procfsCpuFile,
String procfsStatFile,
String procfsNetFile,
String procfsDisksFile,
long jiffyLengthInMillis) {
super(procfsMemFile, procfsCpuFile, procfsStatFile, procfsNetFile,
jiffyLengthInMillis);
procfsDisksFile, jiffyLengthInMillis);
}
@Override
long getCurrentTime() {
@ -56,6 +57,10 @@ public class TestSysInfoLinux {
public void advanceTime(long adv) {
currentTime += adv * this.getJiffyLengthInMillis();
}
@Override
int readDiskBlockInformation(String diskName, int defSector) {
return SECTORSIZE;
}
}
private static final FakeLinuxResourceCalculatorPlugin plugin;
private static String TEST_ROOT_DIR = new Path(System.getProperty(
@ -64,6 +69,7 @@ public class TestSysInfoLinux {
private static final String FAKE_CPUFILE;
private static final String FAKE_STATFILE;
private static final String FAKE_NETFILE;
private static final String FAKE_DISKSFILE;
private static final long FAKE_JIFFY_LENGTH = 10L;
static {
int randomNum = (new Random()).nextInt(1000000000);
@ -71,9 +77,11 @@ public class TestSysInfoLinux {
FAKE_CPUFILE = TEST_ROOT_DIR + File.separator + "CPUINFO_" + randomNum;
FAKE_STATFILE = TEST_ROOT_DIR + File.separator + "STATINFO_" + randomNum;
FAKE_NETFILE = TEST_ROOT_DIR + File.separator + "NETINFO_" + randomNum;
FAKE_DISKSFILE = TEST_ROOT_DIR + File.separator + "DISKSINFO_" + randomNum;
plugin = new FakeLinuxResourceCalculatorPlugin(FAKE_MEMFILE, FAKE_CPUFILE,
FAKE_STATFILE,
FAKE_NETFILE,
FAKE_DISKSFILE,
FAKE_JIFFY_LENGTH);
}
static final String MEMINFO_FORMAT =
@ -157,6 +165,38 @@ public class TestSysInfoLinux {
" eth1: %d 3152521 0 0 0 0 0 219781 %d 1866290 0 0 " +
"0 0 0 0\n";
static final String DISKSINFO_FORMAT =
"1 0 ram0 0 0 0 0 0 0 0 0 0 0 0\n"+
"1 1 ram1 0 0 0 0 0 0 0 0 0 0 0\n"+
"1 2 ram2 0 0 0 0 0 0 0 0 0 0 0\n"+
"1 3 ram3 0 0 0 0 0 0 0 0 0 0 0\n"+
"1 4 ram4 0 0 0 0 0 0 0 0 0 0 0\n"+
"1 5 ram5 0 0 0 0 0 0 0 0 0 0 0\n"+
"1 6 ram6 0 0 0 0 0 0 0 0 0 0 0\n"+
"7 0 loop0 0 0 0 0 0 0 0 0 0 0 0\n"+
"7 1 loop1 0 0 0 0 0 0 0 0 0 0 0\n"+
"8 0 sda 82575678 2486518 %d 59876600 3225402 19761924 %d " +
"6407705 4 48803346 66227952\n"+
"8 1 sda1 732 289 21354 787 7 3 32 4 0 769 791"+
"8 2 sda2 744272 2206315 23605200 6742762 336830 2979630 " +
"26539520 1424776 4 1820130 8165444\n"+
"8 3 sda3 81830497 279914 17881852954 53132969 2888558 16782291 " +
"157367552 4982925 0 47077660 58061635\n"+
"8 32 sdc 10148118 693255 %d 122125461 6090515 401630172 %d 2696685590 " +
"0 26848216 2818793840\n"+
"8 33 sdc1 10147917 693230 2054138426 122125426 6090506 401630172 " +
"3261765880 2696685589 0 26848181 2818793804\n"+
"8 64 sde 9989771 553047 %d 93407551 5978572 391997273 %d 2388274325 " +
"0 24396646 2481664818\n"+
"8 65 sde1 9989570 553022 1943973346 93407489 5978563 391997273 3183807264 " +
"2388274325 0 24396584 2481666274\n"+
"8 80 sdf 10197163 693995 %d 144374395 6216644 408395438 %d 2669389056 0 " +
"26164759 2813746348\n"+
"8 81 sdf1 10196962 693970 2033452794 144374355 6216635 408395438 3316897064 " +
"2669389056 0 26164719 2813746308\n"+
"8 129 sdi1 10078602 657936 2056552626 108362198 6134036 403851153 3279882064 " +
"2639256086 0 26260432 2747601085\n";
/**
* Test parsing /proc/stat and /proc/cpuinfo
* @throws IOException
@ -358,4 +398,35 @@ public class TestSysInfoLinux {
assertEquals(plugin.getNetworkBytesWritten(), numBytesWrittenIntf1 + numBytesWrittenIntf2);
}
/**
* Test parsing /proc/diskstats
* @throws IOException
*/
@Test
public void parsingProcDisksFile() throws IOException {
long numSectorsReadsda = 1790549L; long numSectorsWrittensda = 1839071L;
long numSectorsReadsdc = 20541402L; long numSectorsWrittensdc = 32617658L;
long numSectorsReadsde = 19439751L; long numSectorsWrittensde = 31838072L;
long numSectorsReadsdf = 20334546L; long numSectorsWrittensdf = 33168970L;
File tempFile = new File(FAKE_DISKSFILE);
tempFile.deleteOnExit();
FileWriter fWriter = new FileWriter(FAKE_DISKSFILE);
fWriter.write(String.format(DISKSINFO_FORMAT,
numSectorsReadsda, numSectorsWrittensda,
numSectorsReadsdc, numSectorsWrittensdc,
numSectorsReadsde, numSectorsWrittensde,
numSectorsReadsdf, numSectorsWrittensdf));
fWriter.close();
long expectedNumSectorsRead = numSectorsReadsda + numSectorsReadsdc +
numSectorsReadsde + numSectorsReadsdf;
long expectedNumSectorsWritten = numSectorsWrittensda + numSectorsWrittensdc +
numSectorsWrittensde + numSectorsWrittensdf;
// use non-default sector size
int diskSectorSize = FakeLinuxResourceCalculatorPlugin.SECTORSIZE;
assertEquals(expectedNumSectorsRead * diskSectorSize,
plugin.getStorageBytesRead());
assertEquals(expectedNumSectorsWritten * diskSectorSize,
plugin.getStorageBytesWritten());
}
}

View File

@ -54,6 +54,12 @@ public class DummyResourceCalculatorPlugin extends ResourceCalculatorPlugin {
/** cumulative number of bytes written over the network */
public static final String NETWORK_BYTES_WRITTEN =
"mapred.tasktracker.networkwritten.testing";
/** cumulative number of bytes read from disks */
public static final String STORAGE_BYTES_READ =
"mapred.tasktracker.storageread.testing";
/** cumulative number of bytes written to disks */
public static final String STORAGE_BYTES_WRITTEN =
"mapred.tasktracker.storagewritten.testing";
/** process cumulative CPU usage time for testing */
public static final String PROC_CUMULATIVE_CPU_TIME =
"mapred.tasktracker.proccumulativecputime.testing";
@ -130,4 +136,15 @@ public class DummyResourceCalculatorPlugin extends ResourceCalculatorPlugin {
return getConf().getLong(NETWORK_BYTES_WRITTEN, -1);
}
/** {@inheritDoc} */
@Override
public long getStorageBytesRead() {
return getConf().getLong(STORAGE_BYTES_READ, -1);
}
/** {@inheritDoc} */
@Override
public long getStorageBytesWritten() {
return getConf().getLong(STORAGE_BYTES_WRITTEN, -1);
}
}

View File

@ -144,6 +144,24 @@ public class ResourceCalculatorPlugin extends Configured {
return sys.getNetworkBytesWritten();
}
/**
* Obtain the aggregated number of bytes read from disks.
*
* @return total number of bytes read.
*/
public long getStorageBytesRead() {
return sys.getStorageBytesRead();
}
/**
* Obtain the aggregated number of bytes written to disks.
*
* @return total number of bytes written.
*/
public long getStorageBytesWritten() {
return sys.getStorageBytesWritten();
}
/**
* Create the ResourceCalculatorPlugin from the class name and configure it. If
* class name is null, this method will try and return a memory calculator