HADOOP-12824. Collect network and disk usage on the node running Windows. Contributed by Inigo Goiri.

This commit is contained in:
Xiaoyu Yao 2016-02-25 15:46:53 -08:00
parent 27edd164a0
commit 6e9a4c598d
4 changed files with 204 additions and 17 deletions

View File

@ -426,6 +426,9 @@ Release 2.8.0 - UNRELEASED
HADOOP-12535. Run FileSystem contract tests with hadoop-azure. HADOOP-12535. Run FileSystem contract tests with hadoop-azure.
(Madhumita Chakraborty via cnauroth) (Madhumita Chakraborty via cnauroth)
HADOOP-12824. Collect network and disk usage on the node running Windows.
(Inigo Goiri via xyao)
OPTIMIZATIONS OPTIMIZATIONS
HADOOP-11785. Reduce the number of listStatus operation in distcp HADOOP-11785. Reduce the number of listStatus operation in distcp

View File

@ -44,6 +44,10 @@ public class SysInfoWindows extends SysInfo {
private long cpuFrequencyKhz; private long cpuFrequencyKhz;
private long cumulativeCpuTimeMs; private long cumulativeCpuTimeMs;
private float cpuUsage; private float cpuUsage;
private long storageBytesRead;
private long storageBytesWritten;
private long netBytesRead;
private long netBytesWritten;
private long lastRefreshTime; private long lastRefreshTime;
static final int REFRESH_INTERVAL_MS = 1000; static final int REFRESH_INTERVAL_MS = 1000;
@ -67,6 +71,10 @@ public class SysInfoWindows extends SysInfo {
cpuFrequencyKhz = -1; cpuFrequencyKhz = -1;
cumulativeCpuTimeMs = -1; cumulativeCpuTimeMs = -1;
cpuUsage = -1; cpuUsage = -1;
storageBytesRead = -1;
storageBytesWritten = -1;
netBytesRead = -1;
netBytesWritten = -1;
} }
String getSystemInfoInfoFromShell() { String getSystemInfoInfoFromShell() {
@ -91,7 +99,7 @@ public class SysInfoWindows extends SysInfo {
reset(); reset();
String sysInfoStr = getSystemInfoInfoFromShell(); String sysInfoStr = getSystemInfoInfoFromShell();
if (sysInfoStr != null) { if (sysInfoStr != null) {
final int sysInfoSplitCount = 7; final int sysInfoSplitCount = 11;
String[] sysInfo = sysInfoStr.substring(0, sysInfoStr.indexOf("\r\n")) String[] sysInfo = sysInfoStr.substring(0, sysInfoStr.indexOf("\r\n"))
.split(","); .split(",");
if (sysInfo.length == sysInfoSplitCount) { if (sysInfo.length == sysInfoSplitCount) {
@ -103,6 +111,10 @@ public class SysInfoWindows extends SysInfo {
numProcessors = Integer.parseInt(sysInfo[4]); numProcessors = Integer.parseInt(sysInfo[4]);
cpuFrequencyKhz = Long.parseLong(sysInfo[5]); cpuFrequencyKhz = Long.parseLong(sysInfo[5]);
cumulativeCpuTimeMs = Long.parseLong(sysInfo[6]); cumulativeCpuTimeMs = Long.parseLong(sysInfo[6]);
storageBytesRead = Long.parseLong(sysInfo[7]);
storageBytesWritten = Long.parseLong(sysInfo[8]);
netBytesRead = Long.parseLong(sysInfo[9]);
netBytesWritten = Long.parseLong(sysInfo[10]);
if (lastCumCpuTimeMs != -1) { if (lastCumCpuTimeMs != -1) {
/** /**
* This number will be the aggregated usage across all cores in * This number will be the aggregated usage across all cores in
@ -203,27 +215,27 @@ public class SysInfoWindows extends SysInfo {
/** {@inheritDoc} */ /** {@inheritDoc} */
@Override @Override
public long getNetworkBytesRead() { public long getNetworkBytesRead() {
// TODO unimplemented refreshIfNeeded();
return 0L; return netBytesRead;
} }
/** {@inheritDoc} */ /** {@inheritDoc} */
@Override @Override
public long getNetworkBytesWritten() { public long getNetworkBytesWritten() {
// TODO unimplemented refreshIfNeeded();
return 0L; return netBytesWritten;
} }
@Override @Override
public long getStorageBytesRead() { public long getStorageBytesRead() {
// TODO unimplemented refreshIfNeeded();
return 0L; return storageBytesRead;
} }
@Override @Override
public long getStorageBytesWritten() { public long getStorageBytesWritten() {
// TODO unimplemented refreshIfNeeded();
return 0L; return storageBytesWritten;
} }
} }

View File

@ -18,6 +18,8 @@
#include "winutils.h" #include "winutils.h"
#include <psapi.h> #include <psapi.h>
#include <PowrProf.h> #include <PowrProf.h>
#include <pdh.h>
#include <pdhmsg.h>
#ifdef PSAPI_VERSION #ifdef PSAPI_VERSION
#undef PSAPI_VERSION #undef PSAPI_VERSION
@ -25,6 +27,12 @@
#define PSAPI_VERSION 1 #define PSAPI_VERSION 1
#pragma comment(lib, "psapi.lib") #pragma comment(lib, "psapi.lib")
#pragma comment(lib, "Powrprof.lib") #pragma comment(lib, "Powrprof.lib")
#pragma comment(lib, "pdh.lib")
CONST PWSTR COUNTER_PATH_NET_READ_ALL = L"\\Network Interface(*)\\Bytes Received/Sec";
CONST PWSTR COUNTER_PATH_NET_WRITE_ALL = L"\\Network Interface(*)\\Bytes Sent/Sec";
CONST PWSTR COUNTER_PATH_DISK_READ_ALL = L"\\LogicalDisk(*)\\Disk Read Bytes/sec";
CONST PWSTR COUNTER_PATH_DISK_WRITE_ALL = L"\\LogicalDisk(*)\\Disk Write Bytes/sec";
typedef struct _PROCESSOR_POWER_INFORMATION { typedef struct _PROCESSOR_POWER_INFORMATION {
ULONG Number; ULONG Number;
@ -57,6 +65,7 @@ int SystemInfo()
PROCESSOR_POWER_INFORMATION const *ppi; PROCESSOR_POWER_INFORMATION const *ppi;
ULONGLONG cpuFrequencyKhz; ULONGLONG cpuFrequencyKhz;
NTSTATUS status; NTSTATUS status;
LONGLONG diskRead, diskWrite, netRead, netWrite;
ZeroMemory(&memInfo, sizeof(PERFORMANCE_INFORMATION)); ZeroMemory(&memInfo, sizeof(PERFORMANCE_INFORMATION));
memInfo.cb = sizeof(PERFORMANCE_INFORMATION); memInfo.cb = sizeof(PERFORMANCE_INFORMATION);
@ -105,8 +114,16 @@ int SystemInfo()
cpuFrequencyKhz = ppi->MaxMhz*1000; cpuFrequencyKhz = ppi->MaxMhz*1000;
LocalFree(pBuffer); LocalFree(pBuffer);
fwprintf_s(stdout, L"%Iu,%Iu,%Iu,%Iu,%u,%I64u,%I64u\n", vmemSize, memSize, status = GetDiskAndNetwork(&diskRead, &diskWrite, &netRead, &netWrite);
vmemFree, memFree, sysInfo.dwNumberOfProcessors, cpuFrequencyKhz, cpuTimeMs); if(0 != status)
{
fwprintf_s(stderr, L"Error in GetDiskAndNetwork. Err:%d\n", status);
return EXIT_FAILURE;
}
fwprintf_s(stdout, L"%Iu,%Iu,%Iu,%Iu,%u,%I64u,%I64u,%I64d,%I64d,%I64d,%I64d\n", vmemSize, memSize,
vmemFree, memFree, sysInfo.dwNumberOfProcessors, cpuFrequencyKhz, cpuTimeMs,
diskRead, diskWrite, netRead, netWrite);
return EXIT_SUCCESS; return EXIT_SUCCESS;
} }
@ -120,5 +137,151 @@ void SystemInfoUsage()
VirtualMemorySize(bytes),PhysicalMemorySize(bytes),\n\ VirtualMemorySize(bytes),PhysicalMemorySize(bytes),\n\
FreeVirtualMemory(bytes),FreePhysicalMemory(bytes),\n\ FreeVirtualMemory(bytes),FreePhysicalMemory(bytes),\n\
NumberOfProcessors,CpuFrequency(Khz),\n\ NumberOfProcessors,CpuFrequency(Khz),\n\
CpuTime(MilliSec,Kernel+User)\n"); CpuTime(MilliSec,Kernel+User),\n\
DiskRead(bytes),DiskWrite(bytes),\n\
NetworkRead(bytes),NetworkWrite(bytes)\n");
}
int GetDiskAndNetwork(LONGLONG* diskRead, LONGLONG* diskWrite, LONGLONG* netRead, LONGLONG* netWrite)
{
int ret = EXIT_SUCCESS;
PDH_STATUS status = ERROR_SUCCESS;
PDH_HQUERY hQuery = NULL;
PDH_HCOUNTER hCounterNetRead = NULL;
PDH_HCOUNTER hCounterNetWrite = NULL;
PDH_HCOUNTER hCounterDiskRead = NULL;
PDH_HCOUNTER hCounterDiskWrite = NULL;
DWORD i;
if(status = PdhOpenQuery(NULL, 0, &hQuery))
{
fwprintf_s(stderr, L"PdhOpenQuery failed with 0x%x.\n", status);
ret = EXIT_FAILURE;
goto cleanup;
}
// Add each one of the counters with wild cards
if(status = PdhAddCounter(hQuery, COUNTER_PATH_NET_READ_ALL, 0, &hCounterNetRead))
{
fwprintf_s(stderr, L"PdhAddCounter %s failed with 0x%x.\n", COUNTER_PATH_NET_READ_ALL, status);
ret = EXIT_FAILURE;
goto cleanup;
}
if(status = PdhAddCounter(hQuery, COUNTER_PATH_NET_WRITE_ALL, 0, &hCounterNetWrite))
{
fwprintf_s(stderr, L"PdhAddCounter %s failed with 0x%x.\n", COUNTER_PATH_NET_WRITE_ALL, status);
ret = EXIT_FAILURE;
goto cleanup;
}
if(status = PdhAddCounter(hQuery, COUNTER_PATH_DISK_READ_ALL, 0, &hCounterDiskRead))
{
fwprintf_s(stderr, L"PdhAddCounter %s failed with 0x%x.\n", COUNTER_PATH_DISK_READ_ALL, status);
ret = EXIT_FAILURE;
goto cleanup;
}
if(status = PdhAddCounter(hQuery, COUNTER_PATH_DISK_WRITE_ALL, 0, &hCounterDiskWrite))
{
fwprintf_s(stderr, L"PdhAddCounter %s failed with 0x%x.\n", COUNTER_PATH_DISK_WRITE_ALL, status);
ret = EXIT_FAILURE;
goto cleanup;
}
if(status = PdhCollectQueryData(hQuery))
{
fwprintf_s(stderr, L"PdhCollectQueryData() failed with 0x%x.\n", status);
ret = EXIT_FAILURE;
goto cleanup;
}
// Read and aggregate counters
status = ReadTotalCounter(hCounterNetRead, netRead);
if(ERROR_SUCCESS != status)
{
fwprintf_s(stderr, L"ReadTotalCounter(Network Read): Error 0x%x.\n", status);
ret = EXIT_FAILURE;
}
status = ReadTotalCounter(hCounterNetWrite, netWrite);
if(ERROR_SUCCESS != status)
{
fwprintf_s(stderr, L"ReadTotalCounter(Network Write): Error 0x%x.\n", status);
ret = EXIT_FAILURE;
}
status = ReadTotalCounter(hCounterDiskRead, diskRead);
if(ERROR_SUCCESS != status)
{
fwprintf_s(stderr, L"ReadTotalCounter(Disk Read): Error 0x%x.\n", status);
ret = EXIT_FAILURE;
}
status = ReadTotalCounter(hCounterDiskWrite, diskWrite);
if(ERROR_SUCCESS != status)
{
fwprintf_s(stderr, L"ReadTotalCounter(Disk Write): Error 0x%x.\n", status);
ret = EXIT_FAILURE;
}
cleanup:
if (hQuery)
{
status = PdhCloseQuery(hQuery);
}
return ret;
}
PDH_STATUS ReadTotalCounter(PDH_HCOUNTER hCounter, LONGLONG* ret)
{
PDH_STATUS status = ERROR_SUCCESS;
DWORD i = 0;
DWORD dwBufferSize = 0;
DWORD dwItemCount = 0;
PDH_RAW_COUNTER_ITEM *pItems = NULL;
// Initialize output
*ret = 0;
// Get the required size of the pItems buffer
status = PdhGetRawCounterArray(hCounter, &dwBufferSize, &dwItemCount, NULL);
if (PDH_MORE_DATA == status)
{
pItems = (PDH_RAW_COUNTER_ITEM *) malloc(dwBufferSize);
if (pItems)
{
// Actually query the counter
status = PdhGetRawCounterArray(hCounter, &dwBufferSize, &dwItemCount, pItems);
if (ERROR_SUCCESS == status) {
for (i = 0; i < dwItemCount; i++) {
if (wcscmp(L"_Total", pItems[i].szName) == 0) {
*ret = pItems[i].RawValue.FirstValue;
break;
} else {
*ret += pItems[i].RawValue.FirstValue;
}
}
} else {
*ret = -1;
goto cleanup;
}
// Reset structures
free(pItems);
pItems = NULL;
dwBufferSize = dwItemCount = 0;
} else {
*ret = -1;
status = PDH_NO_DATA;
goto cleanup;
}
} else {
*ret = -1;
goto cleanup;
}
cleanup:
if (pItems) {
free(pItems);
}
return status;
} }

View File

@ -47,7 +47,8 @@ public class TestSysInfoWindows {
public void parseSystemInfoString() { public void parseSystemInfoString() {
SysInfoWindowsMock tester = new SysInfoWindowsMock(); SysInfoWindowsMock tester = new SysInfoWindowsMock();
tester.setSysinfoString( tester.setSysinfoString(
"17177038848,8589467648,15232745472,6400417792,1,2805000,6261812\r\n"); "17177038848,8589467648,15232745472,6400417792,1,2805000,6261812," +
"1234567,2345678,3456789,4567890\r\n");
// info str derived from windows shell command has \r\n termination // info str derived from windows shell command has \r\n termination
assertEquals(17177038848L, tester.getVirtualMemorySize()); assertEquals(17177038848L, tester.getVirtualMemorySize());
assertEquals(8589467648L, tester.getPhysicalMemorySize()); assertEquals(8589467648L, tester.getPhysicalMemorySize());
@ -57,6 +58,10 @@ public class TestSysInfoWindows {
assertEquals(1, tester.getNumCores()); assertEquals(1, tester.getNumCores());
assertEquals(2805000L, tester.getCpuFrequency()); assertEquals(2805000L, tester.getCpuFrequency());
assertEquals(6261812L, tester.getCumulativeCpuTime()); assertEquals(6261812L, tester.getCumulativeCpuTime());
assertEquals(1234567L, tester.getStorageBytesRead());
assertEquals(2345678L, tester.getStorageBytesWritten());
assertEquals(3456789L, tester.getNetworkBytesRead());
assertEquals(4567890L, tester.getNetworkBytesWritten());
// undef on first call // undef on first call
assertEquals((float)CpuTimeTracker.UNAVAILABLE, assertEquals((float)CpuTimeTracker.UNAVAILABLE,
tester.getCpuUsagePercentage(), 0.0); tester.getCpuUsagePercentage(), 0.0);
@ -68,7 +73,8 @@ public class TestSysInfoWindows {
public void refreshAndCpuUsage() throws InterruptedException { public void refreshAndCpuUsage() throws InterruptedException {
SysInfoWindowsMock tester = new SysInfoWindowsMock(); SysInfoWindowsMock tester = new SysInfoWindowsMock();
tester.setSysinfoString( tester.setSysinfoString(
"17177038848,8589467648,15232745472,6400417792,1,2805000,6261812\r\n"); "17177038848,8589467648,15232745472,6400417792,1,2805000,6261812," +
"1234567,2345678,3456789,4567890\r\n");
// info str derived from windows shell command has \r\n termination // info str derived from windows shell command has \r\n termination
tester.getAvailablePhysicalMemorySize(); tester.getAvailablePhysicalMemorySize();
// verify information has been refreshed // verify information has been refreshed
@ -79,7 +85,8 @@ public class TestSysInfoWindows {
tester.getNumVCoresUsed(), 0.0); tester.getNumVCoresUsed(), 0.0);
tester.setSysinfoString( tester.setSysinfoString(
"17177038848,8589467648,15232745472,5400417792,1,2805000,6263012\r\n"); "17177038848,8589467648,15232745472,5400417792,1,2805000,6263012," +
"1234567,2345678,3456789,4567890\r\n");
tester.getAvailablePhysicalMemorySize(); tester.getAvailablePhysicalMemorySize();
// verify information has not been refreshed // verify information has not been refreshed
assertEquals(6400417792L, tester.getAvailablePhysicalMemorySize()); assertEquals(6400417792L, tester.getAvailablePhysicalMemorySize());
@ -106,12 +113,14 @@ public class TestSysInfoWindows {
// test with 12 cores // test with 12 cores
SysInfoWindowsMock tester = new SysInfoWindowsMock(); SysInfoWindowsMock tester = new SysInfoWindowsMock();
tester.setSysinfoString( tester.setSysinfoString(
"17177038848,8589467648,15232745472,6400417792,12,2805000,6261812\r\n"); "17177038848,8589467648,15232745472,6400417792,12,2805000,6261812," +
"1234567,2345678,3456789,4567890\r\n");
// verify information has been refreshed // verify information has been refreshed
assertEquals(6400417792L, tester.getAvailablePhysicalMemorySize()); assertEquals(6400417792L, tester.getAvailablePhysicalMemorySize());
tester.setSysinfoString( tester.setSysinfoString(
"17177038848,8589467648,15232745472,5400417792,12,2805000,6263012\r\n"); "17177038848,8589467648,15232745472,5400417792,12,2805000,6263012," +
"1234567,2345678,3456789,4567890\r\n");
// verify information has not been refreshed // verify information has not been refreshed
assertEquals(6400417792L, tester.getAvailablePhysicalMemorySize()); assertEquals(6400417792L, tester.getAvailablePhysicalMemorySize());