HADOOP-12210. Collect network usage on the node. Contributed by Robert Grandl
(cherry picked from commit 1a0752d85a
)
This commit is contained in:
parent
753d87066f
commit
66419d3672
|
@ -192,6 +192,8 @@ Release 2.8.0 - UNRELEASED
|
||||||
HADOOP-12180. Move ResourceCalculatorPlugin from YARN to Common.
|
HADOOP-12180. Move ResourceCalculatorPlugin from YARN to Common.
|
||||||
(Chris Douglas via kasha)
|
(Chris Douglas via kasha)
|
||||||
|
|
||||||
|
HADOOP-12210. Collect network usage on the node (Robert Grandl via cdouglas)
|
||||||
|
|
||||||
OPTIMIZATIONS
|
OPTIMIZATIONS
|
||||||
|
|
||||||
HADOOP-11785. Reduce the number of listStatus operation in distcp
|
HADOOP-11785. Reduce the number of listStatus operation in distcp
|
||||||
|
|
|
@ -108,4 +108,16 @@ public abstract class SysInfo {
|
||||||
*/
|
*/
|
||||||
public abstract float getCpuUsage();
|
public abstract float getCpuUsage();
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Obtain the aggregated number of bytes read over the network.
|
||||||
|
* @return total number of bytes read.
|
||||||
|
*/
|
||||||
|
public abstract long getNetworkBytesRead();
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Obtain the aggregated number of bytes written to the network.
|
||||||
|
* @return total number of bytes written.
|
||||||
|
*/
|
||||||
|
public abstract long getNetworkBytesWritten();
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
|
@ -83,9 +83,22 @@ public class SysInfoLinux extends SysInfo {
|
||||||
"[ \t]*([0-9]*)[ \t]*([0-9]*)[ \t].*");
|
"[ \t]*([0-9]*)[ \t]*([0-9]*)[ \t].*");
|
||||||
private CpuTimeTracker cpuTimeTracker;
|
private CpuTimeTracker cpuTimeTracker;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Pattern for parsing /proc/net/dev.
|
||||||
|
*/
|
||||||
|
private static final String PROCFS_NETFILE = "/proc/net/dev";
|
||||||
|
private static final Pattern PROCFS_NETFILE_FORMAT =
|
||||||
|
Pattern .compile("^[ \t]*([a-zA-Z]+[0-9]*):" +
|
||||||
|
"[ \t]*([0-9]+)[ \t]*([0-9]+)[ \t]*([0-9]+)[ \t]*([0-9]+)" +
|
||||||
|
"[ \t]*([0-9]+)[ \t]*([0-9]+)[ \t]*([0-9]+)[ \t]*([0-9]+)" +
|
||||||
|
"[ \t]*([0-9]+)[ \t]*([0-9]+)[ \t]*([0-9]+)[ \t]*([0-9]+)" +
|
||||||
|
"[ \t]*([0-9]+)[ \t]*([0-9]+)[ \t]*([0-9]+)[ \t]*([0-9]+).*");
|
||||||
|
|
||||||
|
|
||||||
private String procfsMemFile;
|
private String procfsMemFile;
|
||||||
private String procfsCpuFile;
|
private String procfsCpuFile;
|
||||||
private String procfsStatFile;
|
private String procfsStatFile;
|
||||||
|
private String procfsNetFile;
|
||||||
private long jiffyLengthInMillis;
|
private long jiffyLengthInMillis;
|
||||||
|
|
||||||
private long ramSize = 0;
|
private long ramSize = 0;
|
||||||
|
@ -98,6 +111,8 @@ public class SysInfoLinux extends SysInfo {
|
||||||
/* number of physical cores on the system. */
|
/* number of physical cores on the system. */
|
||||||
private int numCores = 0;
|
private int numCores = 0;
|
||||||
private long cpuFrequency = 0L; // CPU frequency on the system (kHz)
|
private long cpuFrequency = 0L; // CPU frequency on the system (kHz)
|
||||||
|
private long numNetBytesRead = 0L; // aggregated bytes read from network
|
||||||
|
private long numNetBytesWritten = 0L; // aggregated bytes written to network
|
||||||
|
|
||||||
private boolean readMemInfoFile = false;
|
private boolean readMemInfoFile = false;
|
||||||
private boolean readCpuInfoFile = false;
|
private boolean readCpuInfoFile = false;
|
||||||
|
@ -130,7 +145,7 @@ public class SysInfoLinux extends SysInfo {
|
||||||
|
|
||||||
public SysInfoLinux() {
|
public SysInfoLinux() {
|
||||||
this(PROCFS_MEMFILE, PROCFS_CPUINFO, PROCFS_STAT,
|
this(PROCFS_MEMFILE, PROCFS_CPUINFO, PROCFS_STAT,
|
||||||
JIFFY_LENGTH_IN_MILLIS);
|
PROCFS_NETFILE, JIFFY_LENGTH_IN_MILLIS);
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -139,16 +154,19 @@ public class SysInfoLinux extends SysInfo {
|
||||||
* @param procfsMemFile fake file for /proc/meminfo
|
* @param procfsMemFile fake file for /proc/meminfo
|
||||||
* @param procfsCpuFile fake file for /proc/cpuinfo
|
* @param procfsCpuFile fake file for /proc/cpuinfo
|
||||||
* @param procfsStatFile fake file for /proc/stat
|
* @param procfsStatFile fake file for /proc/stat
|
||||||
|
* @param procfsNetFile fake file for /proc/net/dev
|
||||||
* @param jiffyLengthInMillis fake jiffy length value
|
* @param jiffyLengthInMillis fake jiffy length value
|
||||||
*/
|
*/
|
||||||
@VisibleForTesting
|
@VisibleForTesting
|
||||||
public SysInfoLinux(String procfsMemFile,
|
public SysInfoLinux(String procfsMemFile,
|
||||||
String procfsCpuFile,
|
String procfsCpuFile,
|
||||||
String procfsStatFile,
|
String procfsStatFile,
|
||||||
|
String procfsNetFile,
|
||||||
long jiffyLengthInMillis) {
|
long jiffyLengthInMillis) {
|
||||||
this.procfsMemFile = procfsMemFile;
|
this.procfsMemFile = procfsMemFile;
|
||||||
this.procfsCpuFile = procfsCpuFile;
|
this.procfsCpuFile = procfsCpuFile;
|
||||||
this.procfsStatFile = procfsStatFile;
|
this.procfsStatFile = procfsStatFile;
|
||||||
|
this.procfsNetFile = procfsNetFile;
|
||||||
this.jiffyLengthInMillis = jiffyLengthInMillis;
|
this.jiffyLengthInMillis = jiffyLengthInMillis;
|
||||||
this.cpuTimeTracker = new CpuTimeTracker(jiffyLengthInMillis);
|
this.cpuTimeTracker = new CpuTimeTracker(jiffyLengthInMillis);
|
||||||
}
|
}
|
||||||
|
@ -338,6 +356,61 @@ public class SysInfoLinux extends SysInfo {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Read /proc/net/dev file, parse and calculate amount
|
||||||
|
* of bytes read and written through the network.
|
||||||
|
*/
|
||||||
|
private void readProcNetInfoFile() {
|
||||||
|
|
||||||
|
numNetBytesRead = 0L;
|
||||||
|
numNetBytesWritten = 0L;
|
||||||
|
|
||||||
|
// Read "/proc/net/dev" file
|
||||||
|
BufferedReader in;
|
||||||
|
InputStreamReader fReader;
|
||||||
|
try {
|
||||||
|
fReader = new InputStreamReader(
|
||||||
|
new FileInputStream(procfsNetFile), Charset.forName("UTF-8"));
|
||||||
|
in = new BufferedReader(fReader);
|
||||||
|
} catch (FileNotFoundException f) {
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
Matcher mat;
|
||||||
|
try {
|
||||||
|
String str = in.readLine();
|
||||||
|
while (str != null) {
|
||||||
|
mat = PROCFS_NETFILE_FORMAT.matcher(str);
|
||||||
|
if (mat.find()) {
|
||||||
|
assert mat.groupCount() >= 16;
|
||||||
|
|
||||||
|
// ignore loopback interfaces
|
||||||
|
if (mat.group(1).equals("lo")) {
|
||||||
|
str = in.readLine();
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
numNetBytesRead += Long.parseLong(mat.group(2));
|
||||||
|
numNetBytesWritten += Long.parseLong(mat.group(10));
|
||||||
|
}
|
||||||
|
str = in.readLine();
|
||||||
|
}
|
||||||
|
} catch (IOException io) {
|
||||||
|
LOG.warn("Error reading the stream " + io);
|
||||||
|
} finally {
|
||||||
|
// Close the streams
|
||||||
|
try {
|
||||||
|
fReader.close();
|
||||||
|
try {
|
||||||
|
in.close();
|
||||||
|
} catch (IOException i) {
|
||||||
|
LOG.warn("Error closing the stream " + in);
|
||||||
|
}
|
||||||
|
} catch (IOException i) {
|
||||||
|
LOG.warn("Error closing the stream " + fReader);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
/** {@inheritDoc} */
|
/** {@inheritDoc} */
|
||||||
@Override
|
@Override
|
||||||
public long getPhysicalMemorySize() {
|
public long getPhysicalMemorySize() {
|
||||||
|
@ -405,6 +478,20 @@ public class SysInfoLinux extends SysInfo {
|
||||||
return overallCpuUsage;
|
return overallCpuUsage;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/** {@inheritDoc} */
|
||||||
|
@Override
|
||||||
|
public long getNetworkBytesRead() {
|
||||||
|
readProcNetInfoFile();
|
||||||
|
return numNetBytesRead;
|
||||||
|
}
|
||||||
|
|
||||||
|
/** {@inheritDoc} */
|
||||||
|
@Override
|
||||||
|
public long getNetworkBytesWritten() {
|
||||||
|
readProcNetInfoFile();
|
||||||
|
return numNetBytesWritten;
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Test the {@link SysInfoLinux}.
|
* Test the {@link SysInfoLinux}.
|
||||||
*
|
*
|
||||||
|
@ -424,6 +511,10 @@ public class SysInfoLinux extends SysInfo {
|
||||||
System.out.println("CPU frequency (kHz) : " + plugin.getCpuFrequency());
|
System.out.println("CPU frequency (kHz) : " + plugin.getCpuFrequency());
|
||||||
System.out.println("Cumulative CPU time (ms) : " +
|
System.out.println("Cumulative CPU time (ms) : " +
|
||||||
plugin.getCumulativeCpuTime());
|
plugin.getCumulativeCpuTime());
|
||||||
|
System.out.println("Total network read (bytes) : "
|
||||||
|
+ plugin.getNetworkBytesRead());
|
||||||
|
System.out.println("Total network written (bytes) : "
|
||||||
|
+ plugin.getNetworkBytesWritten());
|
||||||
try {
|
try {
|
||||||
// Sleep so we can compute the CPU usage
|
// Sleep so we can compute the CPU usage
|
||||||
Thread.sleep(500L);
|
Thread.sleep(500L);
|
||||||
|
|
|
@ -178,4 +178,19 @@ public class SysInfoWindows extends SysInfo {
|
||||||
refreshIfNeeded();
|
refreshIfNeeded();
|
||||||
return cpuUsage;
|
return cpuUsage;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/** {@inheritDoc} */
|
||||||
|
@Override
|
||||||
|
public long getNetworkBytesRead() {
|
||||||
|
// TODO unimplemented
|
||||||
|
return 0L;
|
||||||
|
}
|
||||||
|
|
||||||
|
/** {@inheritDoc} */
|
||||||
|
@Override
|
||||||
|
public long getNetworkBytesWritten() {
|
||||||
|
// TODO unimplemented
|
||||||
|
return 0L;
|
||||||
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
|
@ -44,8 +44,10 @@ public class TestSysInfoLinux {
|
||||||
public FakeLinuxResourceCalculatorPlugin(String procfsMemFile,
|
public FakeLinuxResourceCalculatorPlugin(String procfsMemFile,
|
||||||
String procfsCpuFile,
|
String procfsCpuFile,
|
||||||
String procfsStatFile,
|
String procfsStatFile,
|
||||||
|
String procfsNetFile,
|
||||||
long jiffyLengthInMillis) {
|
long jiffyLengthInMillis) {
|
||||||
super(procfsMemFile, procfsCpuFile, procfsStatFile, jiffyLengthInMillis);
|
super(procfsMemFile, procfsCpuFile, procfsStatFile, procfsNetFile,
|
||||||
|
jiffyLengthInMillis);
|
||||||
}
|
}
|
||||||
@Override
|
@Override
|
||||||
long getCurrentTime() {
|
long getCurrentTime() {
|
||||||
|
@ -61,14 +63,17 @@ public class TestSysInfoLinux {
|
||||||
private static final String FAKE_MEMFILE;
|
private static final String FAKE_MEMFILE;
|
||||||
private static final String FAKE_CPUFILE;
|
private static final String FAKE_CPUFILE;
|
||||||
private static final String FAKE_STATFILE;
|
private static final String FAKE_STATFILE;
|
||||||
|
private static final String FAKE_NETFILE;
|
||||||
private static final long FAKE_JIFFY_LENGTH = 10L;
|
private static final long FAKE_JIFFY_LENGTH = 10L;
|
||||||
static {
|
static {
|
||||||
int randomNum = (new Random()).nextInt(1000000000);
|
int randomNum = (new Random()).nextInt(1000000000);
|
||||||
FAKE_MEMFILE = TEST_ROOT_DIR + File.separator + "MEMINFO_" + randomNum;
|
FAKE_MEMFILE = TEST_ROOT_DIR + File.separator + "MEMINFO_" + randomNum;
|
||||||
FAKE_CPUFILE = TEST_ROOT_DIR + File.separator + "CPUINFO_" + randomNum;
|
FAKE_CPUFILE = TEST_ROOT_DIR + File.separator + "CPUINFO_" + randomNum;
|
||||||
FAKE_STATFILE = TEST_ROOT_DIR + File.separator + "STATINFO_" + randomNum;
|
FAKE_STATFILE = TEST_ROOT_DIR + File.separator + "STATINFO_" + randomNum;
|
||||||
|
FAKE_NETFILE = TEST_ROOT_DIR + File.separator + "NETINFO_" + randomNum;
|
||||||
plugin = new FakeLinuxResourceCalculatorPlugin(FAKE_MEMFILE, FAKE_CPUFILE,
|
plugin = new FakeLinuxResourceCalculatorPlugin(FAKE_MEMFILE, FAKE_CPUFILE,
|
||||||
FAKE_STATFILE,
|
FAKE_STATFILE,
|
||||||
|
FAKE_NETFILE,
|
||||||
FAKE_JIFFY_LENGTH);
|
FAKE_JIFFY_LENGTH);
|
||||||
}
|
}
|
||||||
static final String MEMINFO_FORMAT =
|
static final String MEMINFO_FORMAT =
|
||||||
|
@ -141,6 +146,17 @@ public class TestSysInfoLinux {
|
||||||
"procs_running 1\n" +
|
"procs_running 1\n" +
|
||||||
"procs_blocked 0\n";
|
"procs_blocked 0\n";
|
||||||
|
|
||||||
|
static final String NETINFO_FORMAT =
|
||||||
|
"Inter-| Receive | Transmit\n"+
|
||||||
|
"face |bytes packets errs drop fifo frame compressed multicast|bytes packets"+
|
||||||
|
"errs drop fifo colls carrier compressed\n"+
|
||||||
|
" lo: 42236310 563003 0 0 0 0 0 0 42236310 563003 " +
|
||||||
|
"0 0 0 0 0 0\n"+
|
||||||
|
" eth0: %d 3452527 0 0 0 0 0 299787 %d 1866280 0 0 " +
|
||||||
|
"0 0 0 0\n"+
|
||||||
|
" eth1: %d 3152521 0 0 0 0 0 219781 %d 1866290 0 0 " +
|
||||||
|
"0 0 0 0\n";
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Test parsing /proc/stat and /proc/cpuinfo
|
* Test parsing /proc/stat and /proc/cpuinfo
|
||||||
* @throws IOException
|
* @throws IOException
|
||||||
|
@ -320,4 +336,26 @@ public class TestSysInfoLinux {
|
||||||
IOUtils.closeQuietly(fWriter);
|
IOUtils.closeQuietly(fWriter);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Test parsing /proc/net/dev
|
||||||
|
* @throws IOException
|
||||||
|
*/
|
||||||
|
@Test
|
||||||
|
public void parsingProcNetFile() throws IOException {
|
||||||
|
long numBytesReadIntf1 = 2097172468L;
|
||||||
|
long numBytesWrittenIntf1 = 1355620114L;
|
||||||
|
long numBytesReadIntf2 = 1097172460L;
|
||||||
|
long numBytesWrittenIntf2 = 1055620110L;
|
||||||
|
File tempFile = new File(FAKE_NETFILE);
|
||||||
|
tempFile.deleteOnExit();
|
||||||
|
FileWriter fWriter = new FileWriter(FAKE_NETFILE);
|
||||||
|
fWriter.write(String.format(NETINFO_FORMAT,
|
||||||
|
numBytesReadIntf1, numBytesWrittenIntf1,
|
||||||
|
numBytesReadIntf2, numBytesWrittenIntf2));
|
||||||
|
fWriter.close();
|
||||||
|
assertEquals(plugin.getNetworkBytesRead(), numBytesReadIntf1 + numBytesReadIntf2);
|
||||||
|
assertEquals(plugin.getNetworkBytesWritten(), numBytesWrittenIntf1 + numBytesWrittenIntf2);
|
||||||
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
|
@ -48,6 +48,12 @@ public class DummyResourceCalculatorPlugin extends ResourceCalculatorPlugin {
|
||||||
"mapred.tasktracker.cumulativecputime.testing";
|
"mapred.tasktracker.cumulativecputime.testing";
|
||||||
/** CPU usage percentage for testing */
|
/** CPU usage percentage for testing */
|
||||||
public static final String CPU_USAGE = "mapred.tasktracker.cpuusage.testing";
|
public static final String CPU_USAGE = "mapred.tasktracker.cpuusage.testing";
|
||||||
|
/** cumulative number of bytes read over the network */
|
||||||
|
public static final String NETWORK_BYTES_READ =
|
||||||
|
"mapred.tasktracker.networkread.testing";
|
||||||
|
/** cumulative number of bytes written over the network */
|
||||||
|
public static final String NETWORK_BYTES_WRITTEN =
|
||||||
|
"mapred.tasktracker.networkwritten.testing";
|
||||||
/** process cumulative CPU usage time for testing */
|
/** process cumulative CPU usage time for testing */
|
||||||
public static final String PROC_CUMULATIVE_CPU_TIME =
|
public static final String PROC_CUMULATIVE_CPU_TIME =
|
||||||
"mapred.tasktracker.proccumulativecputime.testing";
|
"mapred.tasktracker.proccumulativecputime.testing";
|
||||||
|
@ -111,4 +117,17 @@ public class DummyResourceCalculatorPlugin extends ResourceCalculatorPlugin {
|
||||||
public float getCpuUsage() {
|
public float getCpuUsage() {
|
||||||
return getConf().getFloat(CPU_USAGE, -1);
|
return getConf().getFloat(CPU_USAGE, -1);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/** {@inheritDoc} */
|
||||||
|
@Override
|
||||||
|
public long getNetworkBytesRead() {
|
||||||
|
return getConf().getLong(NETWORK_BYTES_READ, -1);
|
||||||
|
}
|
||||||
|
|
||||||
|
/** {@inheritDoc} */
|
||||||
|
@Override
|
||||||
|
public long getNetworkBytesWritten() {
|
||||||
|
return getConf().getLong(NETWORK_BYTES_WRITTEN, -1);
|
||||||
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
|
@ -124,6 +124,22 @@ public class ResourceCalculatorPlugin extends Configured {
|
||||||
return sys.getCpuUsage();
|
return sys.getCpuUsage();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Obtain the aggregated number of bytes read over the network.
|
||||||
|
* @return total number of bytes read.
|
||||||
|
*/
|
||||||
|
public long getNetworkBytesRead() {
|
||||||
|
return sys.getNetworkBytesRead();
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Obtain the aggregated number of bytes written to the network.
|
||||||
|
* @return total number of bytes written.
|
||||||
|
*/
|
||||||
|
public long getNetworkBytesWritten() {
|
||||||
|
return sys.getNetworkBytesWritten();
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Create the ResourceCalculatorPlugin from the class name and configure it. If
|
* Create the ResourceCalculatorPlugin from the class name and configure it. If
|
||||||
* class name is null, this method will try and return a memory calculator
|
* class name is null, this method will try and return a memory calculator
|
||||||
|
|
Loading…
Reference in New Issue