Merge r1470195 through r1470759 from trunk.
git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/HDFS-2802@1470760 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
commit
aa7dd50c66
|
@ -517,6 +517,9 @@ Trunk (Unreleased)
|
||||||
that POSIX errno is embedded in NativeIOException. (Chris Nauroth via
|
that POSIX errno is embedded in NativeIOException. (Chris Nauroth via
|
||||||
suresh)
|
suresh)
|
||||||
|
|
||||||
|
HADOOP-9443. Port winutils static code analysis change to trunk.
|
||||||
|
(Chuan Liu via suresh)
|
||||||
|
|
||||||
Release 2.0.5-beta - UNRELEASED
|
Release 2.0.5-beta - UNRELEASED
|
||||||
|
|
||||||
INCOMPATIBLE CHANGES
|
INCOMPATIBLE CHANGES
|
||||||
|
|
|
@ -74,18 +74,22 @@ static BOOL ParseMode(LPCWSTR modeString, PMODE_CHANGE_ACTION *actions);
|
||||||
|
|
||||||
static BOOL FreeActions(PMODE_CHANGE_ACTION actions);
|
static BOOL FreeActions(PMODE_CHANGE_ACTION actions);
|
||||||
|
|
||||||
static BOOL ParseCommandLineArguments(__in int argc, __in wchar_t *argv[],
|
static BOOL ParseCommandLineArguments(
|
||||||
__out BOOL *rec, __out_opt INT *mask,
|
__in int argc,
|
||||||
__out_opt PMODE_CHANGE_ACTION *actions, __out LPCWSTR *path);
|
__in_ecount(argc) wchar_t *argv[],
|
||||||
|
__out BOOL *rec,
|
||||||
|
__out_opt INT *mask,
|
||||||
|
__out_opt PMODE_CHANGE_ACTION *actions,
|
||||||
|
__out LPCWSTR *path);
|
||||||
|
|
||||||
static BOOL ChangeFileModeByActions(__in LPCWSTR path,
|
static BOOL ChangeFileModeByActions(__in LPCWSTR path,
|
||||||
PMODE_CHANGE_ACTION actions);
|
MODE_CHANGE_ACTION const *actions);
|
||||||
|
|
||||||
static BOOL ChangeFileMode(__in LPCWSTR path, __in_opt INT mode,
|
static BOOL ChangeFileMode(__in LPCWSTR path, __in_opt INT mode,
|
||||||
__in_opt PMODE_CHANGE_ACTION actions);
|
__in_opt MODE_CHANGE_ACTION const *actions);
|
||||||
|
|
||||||
static BOOL ChangeFileModeRecursively(__in LPCWSTR path, __in_opt INT mode,
|
static BOOL ChangeFileModeRecursively(__in LPCWSTR path, __in_opt INT mode,
|
||||||
__in_opt PMODE_CHANGE_ACTION actions);
|
__in_opt MODE_CHANGE_ACTION const *actions);
|
||||||
|
|
||||||
|
|
||||||
//----------------------------------------------------------------------------
|
//----------------------------------------------------------------------------
|
||||||
|
@ -99,7 +103,7 @@ static BOOL ChangeFileModeRecursively(__in LPCWSTR path, __in_opt INT mode,
|
||||||
//
|
//
|
||||||
// Notes:
|
// Notes:
|
||||||
//
|
//
|
||||||
int Chmod(int argc, wchar_t *argv[])
|
int Chmod(__in int argc, __in_ecount(argc) wchar_t *argv[])
|
||||||
{
|
{
|
||||||
LPWSTR pathName = NULL;
|
LPWSTR pathName = NULL;
|
||||||
LPWSTR longPathName = NULL;
|
LPWSTR longPathName = NULL;
|
||||||
|
@ -169,7 +173,7 @@ ChmodEnd:
|
||||||
// Notes:
|
// Notes:
|
||||||
//
|
//
|
||||||
static BOOL ChangeFileMode(__in LPCWSTR path, __in_opt INT unixAccessMask,
|
static BOOL ChangeFileMode(__in LPCWSTR path, __in_opt INT unixAccessMask,
|
||||||
__in_opt PMODE_CHANGE_ACTION actions)
|
__in_opt MODE_CHANGE_ACTION const *actions)
|
||||||
{
|
{
|
||||||
if (actions != NULL)
|
if (actions != NULL)
|
||||||
return ChangeFileModeByActions(path, actions);
|
return ChangeFileModeByActions(path, actions);
|
||||||
|
@ -202,7 +206,7 @@ static BOOL ChangeFileMode(__in LPCWSTR path, __in_opt INT unixAccessMask,
|
||||||
// - Otherwise, call the method on all its children, then change its mode.
|
// - Otherwise, call the method on all its children, then change its mode.
|
||||||
//
|
//
|
||||||
static BOOL ChangeFileModeRecursively(__in LPCWSTR path, __in_opt INT mode,
|
static BOOL ChangeFileModeRecursively(__in LPCWSTR path, __in_opt INT mode,
|
||||||
__in_opt PMODE_CHANGE_ACTION actions)
|
__in_opt MODE_CHANGE_ACTION const *actions)
|
||||||
{
|
{
|
||||||
BOOL isDir = FALSE;
|
BOOL isDir = FALSE;
|
||||||
BOOL isSymlink = FALSE;
|
BOOL isSymlink = FALSE;
|
||||||
|
@ -335,7 +339,9 @@ ChangeFileModeRecursivelyEnd:
|
||||||
// 1. Recursive is only set on directories
|
// 1. Recursive is only set on directories
|
||||||
// 2. 'actions' is NULL if the mode is octal
|
// 2. 'actions' is NULL if the mode is octal
|
||||||
//
|
//
|
||||||
static BOOL ParseCommandLineArguments(__in int argc, __in wchar_t *argv[],
|
static BOOL ParseCommandLineArguments(
|
||||||
|
__in int argc,
|
||||||
|
__in_ecount(argc) wchar_t *argv[],
|
||||||
__out BOOL *rec,
|
__out BOOL *rec,
|
||||||
__out_opt INT *mask,
|
__out_opt INT *mask,
|
||||||
__out_opt PMODE_CHANGE_ACTION *actions,
|
__out_opt PMODE_CHANGE_ACTION *actions,
|
||||||
|
@ -551,9 +557,9 @@ static INT ComputeNewMode(__in INT oldMode,
|
||||||
// none
|
// none
|
||||||
//
|
//
|
||||||
static BOOL ConvertActionsToMask(__in LPCWSTR path,
|
static BOOL ConvertActionsToMask(__in LPCWSTR path,
|
||||||
__in PMODE_CHANGE_ACTION actions, __out PINT puMask)
|
__in MODE_CHANGE_ACTION const *actions, __out PINT puMask)
|
||||||
{
|
{
|
||||||
PMODE_CHANGE_ACTION curr = NULL;
|
MODE_CHANGE_ACTION const *curr = NULL;
|
||||||
|
|
||||||
BY_HANDLE_FILE_INFORMATION fileInformation;
|
BY_HANDLE_FILE_INFORMATION fileInformation;
|
||||||
DWORD dwErrorCode = ERROR_SUCCESS;
|
DWORD dwErrorCode = ERROR_SUCCESS;
|
||||||
|
@ -608,7 +614,7 @@ static BOOL ConvertActionsToMask(__in LPCWSTR path,
|
||||||
// none
|
// none
|
||||||
//
|
//
|
||||||
static BOOL ChangeFileModeByActions(__in LPCWSTR path,
|
static BOOL ChangeFileModeByActions(__in LPCWSTR path,
|
||||||
PMODE_CHANGE_ACTION actions)
|
MODE_CHANGE_ACTION const *actions)
|
||||||
{
|
{
|
||||||
INT mask = 0;
|
INT mask = 0;
|
||||||
|
|
||||||
|
@ -769,10 +775,14 @@ static BOOL ParseMode(LPCWSTR modeString, PMODE_CHANGE_ACTION *pActions)
|
||||||
switch (c)
|
switch (c)
|
||||||
{
|
{
|
||||||
case NULL:
|
case NULL:
|
||||||
|
__fallthrough;
|
||||||
case L',':
|
case L',':
|
||||||
i++;
|
i++;
|
||||||
|
__fallthrough;
|
||||||
case L'+':
|
case L'+':
|
||||||
|
__fallthrough;
|
||||||
case L'-':
|
case L'-':
|
||||||
|
__fallthrough;
|
||||||
case L'=':
|
case L'=':
|
||||||
state = PARSE_MODE_ACTION_WHO_STATE;
|
state = PARSE_MODE_ACTION_WHO_STATE;
|
||||||
|
|
||||||
|
|
|
@ -116,13 +116,13 @@ ChangeFileOwnerByNameEnd:
|
||||||
// Notes:
|
// Notes:
|
||||||
//
|
//
|
||||||
//
|
//
|
||||||
int Chown(int argc, wchar_t *argv[])
|
int Chown(__in int argc, __in_ecount(argc) wchar_t *argv[])
|
||||||
{
|
{
|
||||||
LPWSTR pathName = NULL;
|
LPWSTR pathName = NULL;
|
||||||
|
|
||||||
LPWSTR ownerInfo = NULL;
|
LPWSTR ownerInfo = NULL;
|
||||||
|
|
||||||
LPWSTR colonPos = NULL;
|
WCHAR const * colonPos = NULL;
|
||||||
|
|
||||||
LPWSTR userName = NULL;
|
LPWSTR userName = NULL;
|
||||||
size_t userNameLen = 0;
|
size_t userNameLen = 0;
|
||||||
|
|
|
@ -32,12 +32,12 @@
|
||||||
// otherwise, space.
|
// otherwise, space.
|
||||||
//
|
//
|
||||||
static BOOL PrintGroups(
|
static BOOL PrintGroups(
|
||||||
LPLOCALGROUP_USERS_INFO_0 groups,
|
LOCALGROUP_USERS_INFO_0 const *groups,
|
||||||
DWORD entries,
|
DWORD entries,
|
||||||
BOOL formatOutput)
|
BOOL formatOutput)
|
||||||
{
|
{
|
||||||
BOOL ret = TRUE;
|
BOOL ret = TRUE;
|
||||||
LPLOCALGROUP_USERS_INFO_0 pTmpBuf = groups;
|
LOCALGROUP_USERS_INFO_0 const *pTmpBuf = groups;
|
||||||
DWORD i;
|
DWORD i;
|
||||||
|
|
||||||
for (i = 0; i < entries; i++)
|
for (i = 0; i < entries; i++)
|
||||||
|
@ -80,7 +80,10 @@ static BOOL PrintGroups(
|
||||||
// TRUE on the valid command line, FALSE otherwise
|
// TRUE on the valid command line, FALSE otherwise
|
||||||
//
|
//
|
||||||
static BOOL ParseCommandLine(
|
static BOOL ParseCommandLine(
|
||||||
int argc, wchar_t *argv[], wchar_t **user, BOOL *formatOutput)
|
__in int argc,
|
||||||
|
__in_ecount(argc) wchar_t *argv[],
|
||||||
|
__out PWSTR *user,
|
||||||
|
__out BOOL *formatOutput)
|
||||||
{
|
{
|
||||||
*formatOutput = FALSE;
|
*formatOutput = FALSE;
|
||||||
|
|
||||||
|
@ -132,7 +135,7 @@ static BOOL ParseCommandLine(
|
||||||
// Notes:
|
// Notes:
|
||||||
//
|
//
|
||||||
//
|
//
|
||||||
int Groups(int argc, wchar_t *argv[])
|
int Groups(__in int argc, __in_ecount(argc) wchar_t *argv[])
|
||||||
{
|
{
|
||||||
LPWSTR input = NULL;
|
LPWSTR input = NULL;
|
||||||
|
|
||||||
|
|
|
@ -37,7 +37,7 @@ typedef enum HardLinkCommandOptionType
|
||||||
// TRUE: If the command line is valid
|
// TRUE: If the command line is valid
|
||||||
// FALSE: otherwise
|
// FALSE: otherwise
|
||||||
static BOOL ParseCommandLine(__in int argc,
|
static BOOL ParseCommandLine(__in int argc,
|
||||||
__in wchar_t *argv[],
|
__in_ecount(argc) wchar_t *argv[],
|
||||||
__out HardLinkCommandOption *command)
|
__out HardLinkCommandOption *command)
|
||||||
{
|
{
|
||||||
*command = HardLinkInvalid;
|
*command = HardLinkInvalid;
|
||||||
|
@ -161,7 +161,7 @@ HardlinkCreateExit:
|
||||||
// Returns:
|
// Returns:
|
||||||
// EXIT_SUCCESS: On success
|
// EXIT_SUCCESS: On success
|
||||||
// EXIT_FAILURE: otherwise
|
// EXIT_FAILURE: otherwise
|
||||||
int Hardlink(int argc, wchar_t *argv[])
|
int Hardlink(__in int argc, __in_ecount(argc) wchar_t *argv[])
|
||||||
{
|
{
|
||||||
DWORD dwErrorCode = ERROR_SUCCESS;
|
DWORD dwErrorCode = ERROR_SUCCESS;
|
||||||
int ret = EXIT_FAILURE;
|
int ret = EXIT_FAILURE;
|
||||||
|
|
|
@ -20,13 +20,11 @@
|
||||||
#endif
|
#endif
|
||||||
|
|
||||||
#pragma once
|
#pragma once
|
||||||
|
|
||||||
#include <stdio.h>
|
#include <stdio.h>
|
||||||
#include <assert.h>
|
#include <assert.h>
|
||||||
#include <windows.h>
|
#include <windows.h>
|
||||||
#include <aclapi.h>
|
#include <aclapi.h>
|
||||||
#include <accctrl.h>
|
#include <accctrl.h>
|
||||||
#include <tchar.h>
|
|
||||||
#include <strsafe.h>
|
#include <strsafe.h>
|
||||||
#include <lm.h>
|
#include <lm.h>
|
||||||
|
|
||||||
|
@ -85,25 +83,25 @@ enum WindowsAclMask
|
||||||
extern const ACCESS_MASK WinMasks[];
|
extern const ACCESS_MASK WinMasks[];
|
||||||
|
|
||||||
|
|
||||||
int Ls(int argc, wchar_t *argv[]);
|
int Ls(__in int argc, __in_ecount(argc) wchar_t *argv[]);
|
||||||
void LsUsage(LPCWSTR program);
|
void LsUsage(LPCWSTR program);
|
||||||
|
|
||||||
int Chmod(int argc, wchar_t *argv[]);
|
int Chmod(__in int argc, __in_ecount(argc) wchar_t *argv[]);
|
||||||
void ChmodUsage(LPCWSTR program);
|
void ChmodUsage(LPCWSTR program);
|
||||||
|
|
||||||
int Chown(int argc, wchar_t *argv[]);
|
int Chown(__in int argc, __in_ecount(argc) wchar_t *argv[]);
|
||||||
void ChownUsage(LPCWSTR program);
|
void ChownUsage(LPCWSTR program);
|
||||||
|
|
||||||
int Groups(int argc, wchar_t *argv[]);
|
int Groups(__in int argc, __in_ecount(argc) wchar_t *argv[]);
|
||||||
void GroupsUsage(LPCWSTR program);
|
void GroupsUsage(LPCWSTR program);
|
||||||
|
|
||||||
int Hardlink(int argc, wchar_t *argv[]);
|
int Hardlink(__in int argc, __in_ecount(argc) wchar_t *argv[]);
|
||||||
void HardlinkUsage();
|
void HardlinkUsage();
|
||||||
|
|
||||||
int Task(int argc, wchar_t *argv[]);
|
int Task(__in int argc, __in_ecount(argc) wchar_t *argv[]);
|
||||||
void TaskUsage();
|
void TaskUsage();
|
||||||
|
|
||||||
int Symlink(int argc, wchar_t *argv[]);
|
int Symlink(__in int argc, __in_ecount(argc) wchar_t *argv[]);
|
||||||
void SymlinkUsage();
|
void SymlinkUsage();
|
||||||
|
|
||||||
int SystemInfo();
|
int SystemInfo();
|
||||||
|
@ -114,9 +112,9 @@ DWORD GetFileInformationByName(__in LPCWSTR pathName, __in BOOL followLink,
|
||||||
|
|
||||||
DWORD ConvertToLongPath(__in PCWSTR path, __deref_out PWSTR *newPath);
|
DWORD ConvertToLongPath(__in PCWSTR path, __deref_out PWSTR *newPath);
|
||||||
|
|
||||||
DWORD GetSidFromAcctNameW(LPCWSTR acctName, PSID* ppSid);
|
DWORD GetSidFromAcctNameW(__in PCWSTR acctName, __out PSID* ppSid);
|
||||||
|
|
||||||
DWORD GetAccntNameFromSid(PSID pSid, LPWSTR *ppAcctName);
|
DWORD GetAccntNameFromSid(__in PSID pSid, __out LPWSTR *ppAcctName);
|
||||||
|
|
||||||
void ReportErrorCode(LPCWSTR func, DWORD err);
|
void ReportErrorCode(LPCWSTR func, DWORD err);
|
||||||
|
|
||||||
|
|
|
@ -226,7 +226,6 @@ ConvertToLongPathExit:
|
||||||
if (dwErrorCode != ERROR_SUCCESS)
|
if (dwErrorCode != ERROR_SUCCESS)
|
||||||
{
|
{
|
||||||
LocalFree(newPathValue);
|
LocalFree(newPathValue);
|
||||||
*newPath = NULL;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
return dwErrorCode;
|
return dwErrorCode;
|
||||||
|
@ -398,7 +397,7 @@ DWORD JunctionPointCheck(__in LPCWSTR pathName, __out PBOOL res)
|
||||||
// Notes:
|
// Notes:
|
||||||
// Caller needs to destroy the memory of Sid by calling LocalFree()
|
// Caller needs to destroy the memory of Sid by calling LocalFree()
|
||||||
//
|
//
|
||||||
DWORD GetSidFromAcctNameW(LPCWSTR acctName, PSID *ppSid)
|
DWORD GetSidFromAcctNameW(__in PCWSTR acctName, __out PSID *ppSid)
|
||||||
{
|
{
|
||||||
DWORD dwSidSize = 0;
|
DWORD dwSidSize = 0;
|
||||||
DWORD cchDomainName = 0;
|
DWORD cchDomainName = 0;
|
||||||
|
@ -545,7 +544,7 @@ static DWORD GetAccess(AUTHZ_CLIENT_CONTEXT_HANDLE hAuthzClient,
|
||||||
{
|
{
|
||||||
return GetLastError();
|
return GetLastError();
|
||||||
}
|
}
|
||||||
*pAccessRights = (*(PACCESS_MASK)(AccessReply.GrantedAccessMask));
|
*pAccessRights = (*(const ACCESS_MASK *)(AccessReply.GrantedAccessMask));
|
||||||
return ERROR_SUCCESS;
|
return ERROR_SUCCESS;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -1088,6 +1087,7 @@ DWORD ChangeFileModeByMask(__in LPCWSTR path, INT mode)
|
||||||
DWORD revision = 0;
|
DWORD revision = 0;
|
||||||
|
|
||||||
PSECURITY_DESCRIPTOR pAbsSD = NULL;
|
PSECURITY_DESCRIPTOR pAbsSD = NULL;
|
||||||
|
PSECURITY_DESCRIPTOR pNonNullSD = NULL;
|
||||||
PACL pAbsDacl = NULL;
|
PACL pAbsDacl = NULL;
|
||||||
PACL pAbsSacl = NULL;
|
PACL pAbsSacl = NULL;
|
||||||
PSID pAbsOwner = NULL;
|
PSID pAbsOwner = NULL;
|
||||||
|
@ -1200,7 +1200,8 @@ DWORD ChangeFileModeByMask(__in LPCWSTR path, INT mode)
|
||||||
// present in the security descriptor, the DACL is replaced. The security
|
// present in the security descriptor, the DACL is replaced. The security
|
||||||
// descriptor is then used to set the security of a file or directory.
|
// descriptor is then used to set the security of a file or directory.
|
||||||
//
|
//
|
||||||
if (!SetSecurityDescriptorDacl(pAbsSD, TRUE, pNewDACL, FALSE))
|
pNonNullSD = (pAbsSD != NULL) ? pAbsSD : pSD;
|
||||||
|
if (!SetSecurityDescriptorDacl(pNonNullSD, TRUE, pNewDACL, FALSE))
|
||||||
{
|
{
|
||||||
ret = GetLastError();
|
ret = GetLastError();
|
||||||
goto ChangeFileModeByMaskEnd;
|
goto ChangeFileModeByMaskEnd;
|
||||||
|
@ -1220,13 +1221,14 @@ DWORD ChangeFileModeByMask(__in LPCWSTR path, INT mode)
|
||||||
// its parent, and the child objects will not lose their inherited permissions
|
// its parent, and the child objects will not lose their inherited permissions
|
||||||
// from the current object.
|
// from the current object.
|
||||||
//
|
//
|
||||||
if (!SetFileSecurity(longPathName, DACL_SECURITY_INFORMATION, pAbsSD))
|
if (!SetFileSecurity(longPathName, DACL_SECURITY_INFORMATION, pNonNullSD))
|
||||||
{
|
{
|
||||||
ret = GetLastError();
|
ret = GetLastError();
|
||||||
goto ChangeFileModeByMaskEnd;
|
goto ChangeFileModeByMaskEnd;
|
||||||
}
|
}
|
||||||
|
|
||||||
ChangeFileModeByMaskEnd:
|
ChangeFileModeByMaskEnd:
|
||||||
|
pNonNullSD = NULL;
|
||||||
LocalFree(longPathName);
|
LocalFree(longPathName);
|
||||||
LocalFree(pSD);
|
LocalFree(pSD);
|
||||||
LocalFree(pNewDACL);
|
LocalFree(pNewDACL);
|
||||||
|
@ -1252,7 +1254,7 @@ ChangeFileModeByMaskEnd:
|
||||||
// Notes:
|
// Notes:
|
||||||
// Caller needs to destroy the memory of account name by calling LocalFree()
|
// Caller needs to destroy the memory of account name by calling LocalFree()
|
||||||
//
|
//
|
||||||
DWORD GetAccntNameFromSid(PSID pSid, LPWSTR *ppAcctName)
|
DWORD GetAccntNameFromSid(__in PSID pSid, __out PWSTR *ppAcctName)
|
||||||
{
|
{
|
||||||
LPWSTR lpName = NULL;
|
LPWSTR lpName = NULL;
|
||||||
DWORD cchName = 0;
|
DWORD cchName = 0;
|
||||||
|
|
|
@ -32,7 +32,7 @@
|
||||||
// altered. The caller need to initilize the mask string to be all '-' to get
|
// altered. The caller need to initilize the mask string to be all '-' to get
|
||||||
// the correct mask string.
|
// the correct mask string.
|
||||||
//
|
//
|
||||||
static BOOL GetMaskString(INT accessMask, LPWSTR maskString)
|
static BOOL GetMaskString(__in INT accessMask, __in_ecount(10) LPWSTR maskString)
|
||||||
{
|
{
|
||||||
if(wcslen(maskString) != 10)
|
if(wcslen(maskString) != 10)
|
||||||
return FALSE;
|
return FALSE;
|
||||||
|
@ -163,7 +163,10 @@ static wchar_t* CurrentDir = L".";
|
||||||
// TRUE on the valid command line, FALSE otherwise
|
// TRUE on the valid command line, FALSE otherwise
|
||||||
//
|
//
|
||||||
BOOL ParseCommandLine(
|
BOOL ParseCommandLine(
|
||||||
int argc, wchar_t *argv[], wchar_t** path, int *optionsMask)
|
__in int argc,
|
||||||
|
__in_ecount(argc) wchar_t *argv[],
|
||||||
|
__deref_out PWSTR *path,
|
||||||
|
__out int *optionsMask)
|
||||||
{
|
{
|
||||||
int MaxOptions = 2; // Should be equal to the number of elems in CmdLineOption
|
int MaxOptions = 2; // Should be equal to the number of elems in CmdLineOption
|
||||||
int i = 0;
|
int i = 0;
|
||||||
|
@ -236,7 +239,7 @@ BOOL ParseCommandLine(
|
||||||
//
|
//
|
||||||
// Notes:
|
// Notes:
|
||||||
//
|
//
|
||||||
int Ls(int argc, wchar_t *argv[])
|
int Ls(__in int argc, __in_ecount(argc) wchar_t *argv[])
|
||||||
{
|
{
|
||||||
LPWSTR pathName = NULL;
|
LPWSTR pathName = NULL;
|
||||||
LPWSTR longPathName = NULL;
|
LPWSTR longPathName = NULL;
|
||||||
|
|
|
@ -19,7 +19,7 @@
|
||||||
|
|
||||||
static void Usage(LPCWSTR program);
|
static void Usage(LPCWSTR program);
|
||||||
|
|
||||||
int wmain(int argc, wchar_t* argv[])
|
int wmain(__in int argc, __in_ecount(argc) wchar_t* argv[])
|
||||||
{
|
{
|
||||||
LPCWSTR cmd = NULL;
|
LPCWSTR cmd = NULL;
|
||||||
|
|
||||||
|
|
|
@ -28,7 +28,7 @@
|
||||||
//
|
//
|
||||||
// Notes:
|
// Notes:
|
||||||
//
|
//
|
||||||
int Symlink(int argc, wchar_t *argv[])
|
int Symlink(__in int argc, __in_ecount(argc) wchar_t *argv[])
|
||||||
{
|
{
|
||||||
PWSTR longLinkName = NULL;
|
PWSTR longLinkName = NULL;
|
||||||
PWSTR longFileName = NULL;
|
PWSTR longFileName = NULL;
|
||||||
|
|
|
@ -51,8 +51,8 @@ int SystemInfo()
|
||||||
ULONGLONG cpuTimeMs;
|
ULONGLONG cpuTimeMs;
|
||||||
size_t size;
|
size_t size;
|
||||||
LPBYTE pBuffer;
|
LPBYTE pBuffer;
|
||||||
PPROCESSOR_POWER_INFORMATION ppi;
|
PROCESSOR_POWER_INFORMATION const *ppi;
|
||||||
long cpuFrequencyKhz;
|
ULONGLONG cpuFrequencyKhz;
|
||||||
NTSTATUS status;
|
NTSTATUS status;
|
||||||
|
|
||||||
ZeroMemory(&memInfo, sizeof(PERFORMANCE_INFORMATION));
|
ZeroMemory(&memInfo, sizeof(PERFORMANCE_INFORMATION));
|
||||||
|
@ -98,11 +98,12 @@ int SystemInfo()
|
||||||
LocalFree(pBuffer);
|
LocalFree(pBuffer);
|
||||||
return EXIT_FAILURE;
|
return EXIT_FAILURE;
|
||||||
}
|
}
|
||||||
ppi = (PPROCESSOR_POWER_INFORMATION)pBuffer;
|
ppi = (PROCESSOR_POWER_INFORMATION const *)pBuffer;
|
||||||
cpuFrequencyKhz = ppi->MaxMhz*1000;
|
cpuFrequencyKhz = ppi->MaxMhz*1000;
|
||||||
LocalFree(pBuffer);
|
LocalFree(pBuffer);
|
||||||
|
|
||||||
fwprintf_s(stdout, L"%Iu,%Iu,%Iu,%Iu,%Iu,%Iu,%Iu\n", vmemSize, memSize, vmemFree, memFree, sysInfo.dwNumberOfProcessors, cpuFrequencyKhz, cpuTimeMs);
|
fwprintf_s(stdout, L"%Iu,%Iu,%Iu,%Iu,%u,%I64u,%I64u\n", vmemSize, memSize,
|
||||||
|
vmemFree, memFree, sysInfo.dwNumberOfProcessors, cpuFrequencyKhz, cpuTimeMs);
|
||||||
|
|
||||||
return EXIT_SUCCESS;
|
return EXIT_SUCCESS;
|
||||||
}
|
}
|
||||||
|
|
|
@ -50,7 +50,7 @@ typedef enum TaskCommandOptionType
|
||||||
// TRUE: If the command line is valid
|
// TRUE: If the command line is valid
|
||||||
// FALSE: otherwise
|
// FALSE: otherwise
|
||||||
static BOOL ParseCommandLine(__in int argc,
|
static BOOL ParseCommandLine(__in int argc,
|
||||||
__in wchar_t *argv[],
|
__in_ecount(argc) wchar_t *argv[],
|
||||||
__out TaskCommandOption *command)
|
__out TaskCommandOption *command)
|
||||||
{
|
{
|
||||||
*command = TaskInvalid;
|
*command = TaskInvalid;
|
||||||
|
@ -99,7 +99,7 @@ static BOOL ParseCommandLine(__in int argc,
|
||||||
// Returns:
|
// Returns:
|
||||||
// ERROR_SUCCESS: On success
|
// ERROR_SUCCESS: On success
|
||||||
// GetLastError: otherwise
|
// GetLastError: otherwise
|
||||||
DWORD createTask(_TCHAR* jobObjName, _TCHAR* cmdLine)
|
DWORD createTask(__in PCWSTR jobObjName,__in PWSTR cmdLine)
|
||||||
{
|
{
|
||||||
DWORD err = ERROR_SUCCESS;
|
DWORD err = ERROR_SUCCESS;
|
||||||
DWORD exitCode = EXIT_FAILURE;
|
DWORD exitCode = EXIT_FAILURE;
|
||||||
|
@ -138,7 +138,7 @@ DWORD createTask(_TCHAR* jobObjName, _TCHAR* cmdLine)
|
||||||
|
|
||||||
// the child JVM uses this env var to send the task OS process identifier
|
// the child JVM uses this env var to send the task OS process identifier
|
||||||
// to the TaskTracker. We pass the job object name.
|
// to the TaskTracker. We pass the job object name.
|
||||||
if(SetEnvironmentVariable(_T("JVM_PID"), jobObjName) == 0)
|
if(SetEnvironmentVariable(L"JVM_PID", jobObjName) == 0)
|
||||||
{
|
{
|
||||||
err = GetLastError();
|
err = GetLastError();
|
||||||
CloseHandle(jobObject);
|
CloseHandle(jobObject);
|
||||||
|
@ -148,12 +148,14 @@ DWORD createTask(_TCHAR* jobObjName, _TCHAR* cmdLine)
|
||||||
ZeroMemory( &si, sizeof(si) );
|
ZeroMemory( &si, sizeof(si) );
|
||||||
si.cb = sizeof(si);
|
si.cb = sizeof(si);
|
||||||
ZeroMemory( &pi, sizeof(pi) );
|
ZeroMemory( &pi, sizeof(pi) );
|
||||||
|
|
||||||
if (CreateProcess(NULL, cmdLine, NULL, NULL, TRUE, 0, NULL, NULL, &si, &pi) == 0)
|
if (CreateProcess(NULL, cmdLine, NULL, NULL, TRUE, 0, NULL, NULL, &si, &pi) == 0)
|
||||||
{
|
{
|
||||||
err = GetLastError();
|
err = GetLastError();
|
||||||
CloseHandle(jobObject);
|
CloseHandle(jobObject);
|
||||||
return err;
|
return err;
|
||||||
}
|
}
|
||||||
|
|
||||||
CloseHandle(pi.hThread);
|
CloseHandle(pi.hThread);
|
||||||
|
|
||||||
// Wait until child process exits.
|
// Wait until child process exits.
|
||||||
|
@ -194,7 +196,7 @@ DWORD createTask(_TCHAR* jobObjName, _TCHAR* cmdLine)
|
||||||
// Returns:
|
// Returns:
|
||||||
// ERROR_SUCCESS: On success
|
// ERROR_SUCCESS: On success
|
||||||
// GetLastError: otherwise
|
// GetLastError: otherwise
|
||||||
DWORD isTaskAlive(const _TCHAR* jobObjName, int* isAlive, int* procsInJob)
|
DWORD isTaskAlive(const WCHAR* jobObjName, int* isAlive, int* procsInJob)
|
||||||
{
|
{
|
||||||
PJOBOBJECT_BASIC_PROCESS_ID_LIST procList;
|
PJOBOBJECT_BASIC_PROCESS_ID_LIST procList;
|
||||||
HANDLE jobObject = NULL;
|
HANDLE jobObject = NULL;
|
||||||
|
@ -254,7 +256,7 @@ DWORD isTaskAlive(const _TCHAR* jobObjName, int* isAlive, int* procsInJob)
|
||||||
// Returns:
|
// Returns:
|
||||||
// ERROR_SUCCESS: On success
|
// ERROR_SUCCESS: On success
|
||||||
// GetLastError: otherwise
|
// GetLastError: otherwise
|
||||||
DWORD killTask(_TCHAR* jobObjName)
|
DWORD killTask(PCWSTR jobObjName)
|
||||||
{
|
{
|
||||||
HANDLE jobObject = OpenJobObject(JOB_OBJECT_TERMINATE, FALSE, jobObjName);
|
HANDLE jobObject = OpenJobObject(JOB_OBJECT_TERMINATE, FALSE, jobObjName);
|
||||||
if(jobObject == NULL)
|
if(jobObject == NULL)
|
||||||
|
@ -286,7 +288,7 @@ DWORD killTask(_TCHAR* jobObjName)
|
||||||
// Returns:
|
// Returns:
|
||||||
// ERROR_SUCCESS: On success
|
// ERROR_SUCCESS: On success
|
||||||
// GetLastError: otherwise
|
// GetLastError: otherwise
|
||||||
DWORD printTaskProcessList(const _TCHAR* jobObjName)
|
DWORD printTaskProcessList(const WCHAR* jobObjName)
|
||||||
{
|
{
|
||||||
DWORD i;
|
DWORD i;
|
||||||
PJOBOBJECT_BASIC_PROCESS_ID_LIST procList;
|
PJOBOBJECT_BASIC_PROCESS_ID_LIST procList;
|
||||||
|
@ -317,9 +319,9 @@ DWORD printTaskProcessList(const _TCHAR* jobObjName)
|
||||||
numProcs = procList->NumberOfAssignedProcesses;
|
numProcs = procList->NumberOfAssignedProcesses;
|
||||||
LocalFree(procList);
|
LocalFree(procList);
|
||||||
procList = (PJOBOBJECT_BASIC_PROCESS_ID_LIST) LocalAlloc(LPTR, sizeof (JOBOBJECT_BASIC_PROCESS_ID_LIST) + numProcs*32);
|
procList = (PJOBOBJECT_BASIC_PROCESS_ID_LIST) LocalAlloc(LPTR, sizeof (JOBOBJECT_BASIC_PROCESS_ID_LIST) + numProcs*32);
|
||||||
if (!procList)
|
if (procList == NULL)
|
||||||
{
|
{
|
||||||
DWORD err = GetLastError();
|
err = GetLastError();
|
||||||
CloseHandle(jobObject);
|
CloseHandle(jobObject);
|
||||||
return err;
|
return err;
|
||||||
}
|
}
|
||||||
|
@ -343,7 +345,7 @@ DWORD printTaskProcessList(const _TCHAR* jobObjName)
|
||||||
userTime.HighPart = user.dwHighDateTime;
|
userTime.HighPart = user.dwHighDateTime;
|
||||||
userTime.LowPart = user.dwLowDateTime;
|
userTime.LowPart = user.dwLowDateTime;
|
||||||
cpuTimeMs = (kernelTime.QuadPart+userTime.QuadPart)/10000;
|
cpuTimeMs = (kernelTime.QuadPart+userTime.QuadPart)/10000;
|
||||||
_ftprintf_s(stdout, TEXT("%u,%Iu,%Iu,%Iu\n"), procList->ProcessIdList[i], pmc.PrivateUsage, pmc.WorkingSetSize, cpuTimeMs);
|
fwprintf_s(stdout, L"%Iu,%Iu,%Iu,%I64u\n", procList->ProcessIdList[i], pmc.PrivateUsage, pmc.WorkingSetSize, cpuTimeMs);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
CloseHandle( hProcess );
|
CloseHandle( hProcess );
|
||||||
|
@ -366,7 +368,7 @@ DWORD printTaskProcessList(const _TCHAR* jobObjName)
|
||||||
// Returns:
|
// Returns:
|
||||||
// ERROR_SUCCESS: On success
|
// ERROR_SUCCESS: On success
|
||||||
// Error code otherwise: otherwise
|
// Error code otherwise: otherwise
|
||||||
int Task(int argc, wchar_t *argv[])
|
int Task(__in int argc, __in_ecount(argc) wchar_t *argv[])
|
||||||
{
|
{
|
||||||
DWORD dwErrorCode = ERROR_SUCCESS;
|
DWORD dwErrorCode = ERROR_SUCCESS;
|
||||||
TaskCommandOption command = TaskInvalid;
|
TaskCommandOption command = TaskInvalid;
|
||||||
|
|
|
@ -327,6 +327,12 @@ Release 2.0.5-beta - UNRELEASED
|
||||||
MAPREDUCE-5163. Update MR App to not use API utility methods for collections
|
MAPREDUCE-5163. Update MR App to not use API utility methods for collections
|
||||||
after YARN-441. (Xuan Gong via vinodkv)
|
after YARN-441. (Xuan Gong via vinodkv)
|
||||||
|
|
||||||
|
MAPREDUCE-5066. Added a timeout for the job.end.notification.url. (Ivan
|
||||||
|
Mitic via acmurthy)
|
||||||
|
|
||||||
|
MAPREDUCE-5146. application classloader may be used too early to load
|
||||||
|
classes. (Sangjin Lee via tomwhite)
|
||||||
|
|
||||||
Release 2.0.4-alpha - UNRELEASED
|
Release 2.0.4-alpha - UNRELEASED
|
||||||
|
|
||||||
INCOMPATIBLE CHANGES
|
INCOMPATIBLE CHANGES
|
||||||
|
|
|
@ -20,10 +20,8 @@ package org.apache.hadoop.mapred;
|
||||||
|
|
||||||
import static java.util.concurrent.TimeUnit.MILLISECONDS;
|
import static java.util.concurrent.TimeUnit.MILLISECONDS;
|
||||||
|
|
||||||
import java.io.ByteArrayOutputStream;
|
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
import java.io.OutputStream;
|
import java.io.OutputStream;
|
||||||
import java.io.PrintStream;
|
|
||||||
import java.net.InetSocketAddress;
|
import java.net.InetSocketAddress;
|
||||||
import java.net.URI;
|
import java.net.URI;
|
||||||
import java.security.PrivilegedExceptionAction;
|
import java.security.PrivilegedExceptionAction;
|
||||||
|
@ -148,6 +146,9 @@ class YarnChild {
|
||||||
// Add tokens to new user so that it may execute its task correctly.
|
// Add tokens to new user so that it may execute its task correctly.
|
||||||
childUGI.addCredentials(credentials);
|
childUGI.addCredentials(credentials);
|
||||||
|
|
||||||
|
// set job classloader if configured before invoking the task
|
||||||
|
MRApps.setJobClassLoader(job);
|
||||||
|
|
||||||
// Create a final reference to the task for the doAs block
|
// Create a final reference to the task for the doAs block
|
||||||
final Task taskFinal = task;
|
final Task taskFinal = task;
|
||||||
childUGI.doAs(new PrivilegedExceptionAction<Object>() {
|
childUGI.doAs(new PrivilegedExceptionAction<Object>() {
|
||||||
|
@ -255,9 +256,6 @@ class YarnChild {
|
||||||
final JobConf job = new JobConf(MRJobConfig.JOB_CONF_FILE);
|
final JobConf job = new JobConf(MRJobConfig.JOB_CONF_FILE);
|
||||||
job.setCredentials(credentials);
|
job.setCredentials(credentials);
|
||||||
|
|
||||||
// set job classloader if configured
|
|
||||||
MRApps.setJobClassLoader(job);
|
|
||||||
|
|
||||||
String appAttemptIdEnv = System
|
String appAttemptIdEnv = System
|
||||||
.getenv(MRJobConfig.APPLICATION_ATTEMPT_ID_ENV);
|
.getenv(MRJobConfig.APPLICATION_ATTEMPT_ID_ENV);
|
||||||
LOG.debug("APPLICATION_ATTEMPT_ID: " + appAttemptIdEnv);
|
LOG.debug("APPLICATION_ATTEMPT_ID: " + appAttemptIdEnv);
|
||||||
|
|
|
@ -27,6 +27,7 @@ import java.net.URL;
|
||||||
|
|
||||||
import org.apache.hadoop.conf.Configurable;
|
import org.apache.hadoop.conf.Configurable;
|
||||||
import org.apache.hadoop.conf.Configuration;
|
import org.apache.hadoop.conf.Configuration;
|
||||||
|
import org.apache.hadoop.mapred.JobContext;
|
||||||
import org.apache.hadoop.mapreduce.MRJobConfig;
|
import org.apache.hadoop.mapreduce.MRJobConfig;
|
||||||
import org.apache.hadoop.mapreduce.v2.api.records.JobReport;
|
import org.apache.hadoop.mapreduce.v2.api.records.JobReport;
|
||||||
import org.mortbay.log.Log;
|
import org.mortbay.log.Log;
|
||||||
|
@ -54,6 +55,7 @@ public class JobEndNotifier implements Configurable {
|
||||||
protected String proxyConf;
|
protected String proxyConf;
|
||||||
protected int numTries; //Number of tries to attempt notification
|
protected int numTries; //Number of tries to attempt notification
|
||||||
protected int waitInterval; //Time (ms) to wait between retrying notification
|
protected int waitInterval; //Time (ms) to wait between retrying notification
|
||||||
|
protected int timeout; // Timeout (ms) on the connection and notification
|
||||||
protected URL urlToNotify; //URL to notify read from the config
|
protected URL urlToNotify; //URL to notify read from the config
|
||||||
protected Proxy proxyToUse = Proxy.NO_PROXY; //Proxy to use for notification
|
protected Proxy proxyToUse = Proxy.NO_PROXY; //Proxy to use for notification
|
||||||
|
|
||||||
|
@ -76,6 +78,9 @@ public class JobEndNotifier implements Configurable {
|
||||||
);
|
);
|
||||||
waitInterval = (waitInterval < 0) ? 5000 : waitInterval;
|
waitInterval = (waitInterval < 0) ? 5000 : waitInterval;
|
||||||
|
|
||||||
|
timeout = conf.getInt(JobContext.MR_JOB_END_NOTIFICATION_TIMEOUT,
|
||||||
|
JobContext.DEFAULT_MR_JOB_END_NOTIFICATION_TIMEOUT);
|
||||||
|
|
||||||
userUrl = conf.get(MRJobConfig.MR_JOB_END_NOTIFICATION_URL);
|
userUrl = conf.get(MRJobConfig.MR_JOB_END_NOTIFICATION_URL);
|
||||||
|
|
||||||
proxyConf = conf.get(MRJobConfig.MR_JOB_END_NOTIFICATION_PROXY);
|
proxyConf = conf.get(MRJobConfig.MR_JOB_END_NOTIFICATION_PROXY);
|
||||||
|
@ -112,8 +117,7 @@ public class JobEndNotifier implements Configurable {
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Notify the URL just once. Use best effort. Timeout hard coded to 5
|
* Notify the URL just once. Use best effort.
|
||||||
* seconds.
|
|
||||||
*/
|
*/
|
||||||
protected boolean notifyURLOnce() {
|
protected boolean notifyURLOnce() {
|
||||||
boolean success = false;
|
boolean success = false;
|
||||||
|
@ -121,8 +125,8 @@ public class JobEndNotifier implements Configurable {
|
||||||
Log.info("Job end notification trying " + urlToNotify);
|
Log.info("Job end notification trying " + urlToNotify);
|
||||||
HttpURLConnection conn =
|
HttpURLConnection conn =
|
||||||
(HttpURLConnection) urlToNotify.openConnection(proxyToUse);
|
(HttpURLConnection) urlToNotify.openConnection(proxyToUse);
|
||||||
conn.setConnectTimeout(5*1000);
|
conn.setConnectTimeout(timeout);
|
||||||
conn.setReadTimeout(5*1000);
|
conn.setReadTimeout(timeout);
|
||||||
conn.setAllowUserInteraction(false);
|
conn.setAllowUserInteraction(false);
|
||||||
if(conn.getResponseCode() != HttpURLConnection.HTTP_OK) {
|
if(conn.getResponseCode() != HttpURLConnection.HTTP_OK) {
|
||||||
Log.warn("Job end notification to " + urlToNotify +" failed with code: "
|
Log.warn("Job end notification to " + urlToNotify +" failed with code: "
|
||||||
|
|
|
@ -73,6 +73,13 @@ public class TestJobEndNotifier extends JobEndNotifier {
|
||||||
+ waitInterval, waitInterval == 5000);
|
+ waitInterval, waitInterval == 5000);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private void testTimeout(Configuration conf) {
|
||||||
|
conf.set(MRJobConfig.MR_JOB_END_NOTIFICATION_TIMEOUT, "1000");
|
||||||
|
setConf(conf);
|
||||||
|
Assert.assertTrue("Expected timeout to be 1000, but was "
|
||||||
|
+ timeout, timeout == 1000);
|
||||||
|
}
|
||||||
|
|
||||||
private void testProxyConfiguration(Configuration conf) {
|
private void testProxyConfiguration(Configuration conf) {
|
||||||
conf.set(MRJobConfig.MR_JOB_END_NOTIFICATION_PROXY, "somehost");
|
conf.set(MRJobConfig.MR_JOB_END_NOTIFICATION_PROXY, "somehost");
|
||||||
setConf(conf);
|
setConf(conf);
|
||||||
|
@ -109,6 +116,7 @@ public class TestJobEndNotifier extends JobEndNotifier {
|
||||||
Configuration conf = new Configuration();
|
Configuration conf = new Configuration();
|
||||||
testNumRetries(conf);
|
testNumRetries(conf);
|
||||||
testWaitInterval(conf);
|
testWaitInterval(conf);
|
||||||
|
testTimeout(conf);
|
||||||
testProxyConfiguration(conf);
|
testProxyConfiguration(conf);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -44,9 +44,10 @@ public class JobEndNotifier {
|
||||||
JobEndStatusInfo notification = null;
|
JobEndStatusInfo notification = null;
|
||||||
String uri = conf.getJobEndNotificationURI();
|
String uri = conf.getJobEndNotificationURI();
|
||||||
if (uri != null) {
|
if (uri != null) {
|
||||||
// +1 to make logic for first notification identical to a retry
|
int retryAttempts = conf.getInt(JobContext.MR_JOB_END_RETRY_ATTEMPTS, 0);
|
||||||
int retryAttempts = conf.getInt(JobContext.MR_JOB_END_RETRY_ATTEMPTS, 0) + 1;
|
|
||||||
long retryInterval = conf.getInt(JobContext.MR_JOB_END_RETRY_INTERVAL, 30000);
|
long retryInterval = conf.getInt(JobContext.MR_JOB_END_RETRY_INTERVAL, 30000);
|
||||||
|
int timeout = conf.getInt(JobContext.MR_JOB_END_NOTIFICATION_TIMEOUT,
|
||||||
|
JobContext.DEFAULT_MR_JOB_END_NOTIFICATION_TIMEOUT);
|
||||||
if (uri.contains("$jobId")) {
|
if (uri.contains("$jobId")) {
|
||||||
uri = uri.replace("$jobId", status.getJobID().toString());
|
uri = uri.replace("$jobId", status.getJobID().toString());
|
||||||
}
|
}
|
||||||
|
@ -56,17 +57,22 @@ public class JobEndNotifier {
|
||||||
(status.getRunState() == JobStatus.FAILED) ? "FAILED" : "KILLED";
|
(status.getRunState() == JobStatus.FAILED) ? "FAILED" : "KILLED";
|
||||||
uri = uri.replace("$jobStatus", statusStr);
|
uri = uri.replace("$jobStatus", statusStr);
|
||||||
}
|
}
|
||||||
notification = new JobEndStatusInfo(uri, retryAttempts, retryInterval);
|
notification = new JobEndStatusInfo(
|
||||||
|
uri, retryAttempts, retryInterval, timeout);
|
||||||
}
|
}
|
||||||
return notification;
|
return notification;
|
||||||
}
|
}
|
||||||
|
|
||||||
private static int httpNotification(String uri) throws IOException {
|
private static int httpNotification(String uri, int timeout)
|
||||||
|
throws IOException {
|
||||||
URI url = new URI(uri, false);
|
URI url = new URI(uri, false);
|
||||||
HttpClient m_client = new HttpClient();
|
HttpClient httpClient = new HttpClient();
|
||||||
|
httpClient.getParams().setSoTimeout(timeout);
|
||||||
|
httpClient.getParams().setConnectionManagerTimeout(timeout);
|
||||||
|
|
||||||
HttpMethod method = new GetMethod(url.getEscapedURI());
|
HttpMethod method = new GetMethod(url.getEscapedURI());
|
||||||
method.setRequestHeader("Accept", "*/*");
|
method.setRequestHeader("Accept", "*/*");
|
||||||
return m_client.executeMethod(method);
|
return httpClient.executeMethod(method);
|
||||||
}
|
}
|
||||||
|
|
||||||
// for use by the LocalJobRunner, without using a thread&queue,
|
// for use by the LocalJobRunner, without using a thread&queue,
|
||||||
|
@ -74,9 +80,10 @@ public class JobEndNotifier {
|
||||||
public static void localRunnerNotification(JobConf conf, JobStatus status) {
|
public static void localRunnerNotification(JobConf conf, JobStatus status) {
|
||||||
JobEndStatusInfo notification = createNotification(conf, status);
|
JobEndStatusInfo notification = createNotification(conf, status);
|
||||||
if (notification != null) {
|
if (notification != null) {
|
||||||
while (notification.configureForRetry()) {
|
do {
|
||||||
try {
|
try {
|
||||||
int code = httpNotification(notification.getUri());
|
int code = httpNotification(notification.getUri(),
|
||||||
|
notification.getTimeout());
|
||||||
if (code != 200) {
|
if (code != 200) {
|
||||||
throw new IOException("Invalid response status code: " + code);
|
throw new IOException("Invalid response status code: " + code);
|
||||||
}
|
}
|
||||||
|
@ -96,7 +103,7 @@ public class JobEndNotifier {
|
||||||
catch (InterruptedException iex) {
|
catch (InterruptedException iex) {
|
||||||
LOG.error("Notification retry error [" + notification + "]", iex);
|
LOG.error("Notification retry error [" + notification + "]", iex);
|
||||||
}
|
}
|
||||||
}
|
} while (notification.configureForRetry());
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -105,12 +112,15 @@ public class JobEndNotifier {
|
||||||
private int retryAttempts;
|
private int retryAttempts;
|
||||||
private long retryInterval;
|
private long retryInterval;
|
||||||
private long delayTime;
|
private long delayTime;
|
||||||
|
private int timeout;
|
||||||
|
|
||||||
JobEndStatusInfo(String uri, int retryAttempts, long retryInterval) {
|
JobEndStatusInfo(String uri, int retryAttempts, long retryInterval,
|
||||||
|
int timeout) {
|
||||||
this.uri = uri;
|
this.uri = uri;
|
||||||
this.retryAttempts = retryAttempts;
|
this.retryAttempts = retryAttempts;
|
||||||
this.retryInterval = retryInterval;
|
this.retryInterval = retryInterval;
|
||||||
this.delayTime = System.currentTimeMillis();
|
this.delayTime = System.currentTimeMillis();
|
||||||
|
this.timeout = timeout;
|
||||||
}
|
}
|
||||||
|
|
||||||
public String getUri() {
|
public String getUri() {
|
||||||
|
@ -125,6 +135,10 @@ public class JobEndNotifier {
|
||||||
return retryInterval;
|
return retryInterval;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public int getTimeout() {
|
||||||
|
return timeout;
|
||||||
|
}
|
||||||
|
|
||||||
public boolean configureForRetry() {
|
public boolean configureForRetry() {
|
||||||
boolean retry = false;
|
boolean retry = false;
|
||||||
if (getRetryAttempts() > 0) {
|
if (getRetryAttempts() > 0) {
|
||||||
|
|
|
@ -616,6 +616,9 @@ public interface MRJobConfig {
|
||||||
public static final String MR_JOB_END_NOTIFICATION_PROXY =
|
public static final String MR_JOB_END_NOTIFICATION_PROXY =
|
||||||
"mapreduce.job.end-notification.proxy";
|
"mapreduce.job.end-notification.proxy";
|
||||||
|
|
||||||
|
public static final String MR_JOB_END_NOTIFICATION_TIMEOUT =
|
||||||
|
"mapreduce.job.end-notification.timeout";
|
||||||
|
|
||||||
public static final String MR_JOB_END_RETRY_ATTEMPTS =
|
public static final String MR_JOB_END_RETRY_ATTEMPTS =
|
||||||
"mapreduce.job.end-notification.retry.attempts";
|
"mapreduce.job.end-notification.retry.attempts";
|
||||||
|
|
||||||
|
@ -628,6 +631,9 @@ public interface MRJobConfig {
|
||||||
public static final String MR_JOB_END_NOTIFICATION_MAX_RETRY_INTERVAL =
|
public static final String MR_JOB_END_NOTIFICATION_MAX_RETRY_INTERVAL =
|
||||||
"mapreduce.job.end-notification.max.retry.interval";
|
"mapreduce.job.end-notification.max.retry.interval";
|
||||||
|
|
||||||
|
public static final int DEFAULT_MR_JOB_END_NOTIFICATION_TIMEOUT =
|
||||||
|
5000;
|
||||||
|
|
||||||
/*
|
/*
|
||||||
* MR AM Service Authorization
|
* MR AM Service Authorization
|
||||||
*/
|
*/
|
||||||
|
|
|
@ -0,0 +1,197 @@
|
||||||
|
/**
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one
|
||||||
|
* or more contributor license agreements. See the NOTICE file
|
||||||
|
* distributed with this work for additional information
|
||||||
|
* regarding copyright ownership. The ASF licenses this file
|
||||||
|
* to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance
|
||||||
|
* with the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
|
||||||
|
package org.apache.hadoop.mapred;
|
||||||
|
|
||||||
|
import java.io.File;
|
||||||
|
import java.io.IOException;
|
||||||
|
import java.io.InputStreamReader;
|
||||||
|
import java.io.PrintStream;
|
||||||
|
import java.net.URI;
|
||||||
|
import java.net.URISyntaxException;
|
||||||
|
import java.net.URL;
|
||||||
|
|
||||||
|
import javax.servlet.ServletException;
|
||||||
|
import javax.servlet.http.HttpServlet;
|
||||||
|
import javax.servlet.http.HttpServletRequest;
|
||||||
|
import javax.servlet.http.HttpServletResponse;
|
||||||
|
|
||||||
|
import junit.framework.TestCase;
|
||||||
|
|
||||||
|
import org.apache.hadoop.conf.Configuration;
|
||||||
|
import org.apache.hadoop.http.HttpServer;
|
||||||
|
|
||||||
|
public class TestJobEndNotifier extends TestCase {
|
||||||
|
HttpServer server;
|
||||||
|
URL baseUrl;
|
||||||
|
|
||||||
|
@SuppressWarnings("serial")
|
||||||
|
public static class JobEndServlet extends HttpServlet {
|
||||||
|
public static volatile int calledTimes = 0;
|
||||||
|
public static URI requestUri;
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void doGet(HttpServletRequest request,
|
||||||
|
HttpServletResponse response
|
||||||
|
) throws ServletException, IOException {
|
||||||
|
InputStreamReader in = new InputStreamReader(request.getInputStream());
|
||||||
|
PrintStream out = new PrintStream(response.getOutputStream());
|
||||||
|
|
||||||
|
calledTimes++;
|
||||||
|
try {
|
||||||
|
requestUri = new URI(null, null,
|
||||||
|
request.getRequestURI(), request.getQueryString(), null);
|
||||||
|
} catch (URISyntaxException e) {
|
||||||
|
}
|
||||||
|
|
||||||
|
in.close();
|
||||||
|
out.close();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Servlet that delays requests for a long time
|
||||||
|
@SuppressWarnings("serial")
|
||||||
|
public static class DelayServlet extends HttpServlet {
|
||||||
|
public static volatile int calledTimes = 0;
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void doGet(HttpServletRequest request,
|
||||||
|
HttpServletResponse response
|
||||||
|
) throws ServletException, IOException {
|
||||||
|
boolean timedOut = false;
|
||||||
|
calledTimes++;
|
||||||
|
try {
|
||||||
|
// Sleep for a long time
|
||||||
|
Thread.sleep(1000000);
|
||||||
|
} catch (InterruptedException e) {
|
||||||
|
timedOut = true;
|
||||||
|
}
|
||||||
|
assertTrue("DelayServlet should be interrupted", timedOut);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Servlet that fails all requests into it
|
||||||
|
@SuppressWarnings("serial")
|
||||||
|
public static class FailServlet extends HttpServlet {
|
||||||
|
public static volatile int calledTimes = 0;
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void doGet(HttpServletRequest request,
|
||||||
|
HttpServletResponse response
|
||||||
|
) throws ServletException, IOException {
|
||||||
|
calledTimes++;
|
||||||
|
throw new IOException("I am failing!");
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
public void setUp() throws Exception {
|
||||||
|
new File(System.getProperty("build.webapps", "build/webapps") + "/test"
|
||||||
|
).mkdirs();
|
||||||
|
server = new HttpServer("test", "0.0.0.0", 0, true);
|
||||||
|
server.addServlet("delay", "/delay", DelayServlet.class);
|
||||||
|
server.addServlet("jobend", "/jobend", JobEndServlet.class);
|
||||||
|
server.addServlet("fail", "/fail", FailServlet.class);
|
||||||
|
server.start();
|
||||||
|
int port = server.getPort();
|
||||||
|
baseUrl = new URL("http://localhost:" + port + "/");
|
||||||
|
|
||||||
|
JobEndServlet.calledTimes = 0;
|
||||||
|
JobEndServlet.requestUri = null;
|
||||||
|
DelayServlet.calledTimes = 0;
|
||||||
|
FailServlet.calledTimes = 0;
|
||||||
|
}
|
||||||
|
|
||||||
|
public void tearDown() throws Exception {
|
||||||
|
server.stop();
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Basic validation for localRunnerNotification.
|
||||||
|
*/
|
||||||
|
public void testLocalJobRunnerUriSubstitution() throws InterruptedException {
|
||||||
|
JobStatus jobStatus = createTestJobStatus(
|
||||||
|
"job_20130313155005308_0001", JobStatus.SUCCEEDED);
|
||||||
|
JobConf jobConf = createTestJobConf(
|
||||||
|
new Configuration(), 0,
|
||||||
|
baseUrl + "jobend?jobid=$jobId&status=$jobStatus");
|
||||||
|
JobEndNotifier.localRunnerNotification(jobConf, jobStatus);
|
||||||
|
|
||||||
|
// No need to wait for the notification to go thru since calls are
|
||||||
|
// synchronous
|
||||||
|
|
||||||
|
// Validate params
|
||||||
|
assertEquals(1, JobEndServlet.calledTimes);
|
||||||
|
assertEquals("jobid=job_20130313155005308_0001&status=SUCCEEDED",
|
||||||
|
JobEndServlet.requestUri.getQuery());
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Validate job.end.retry.attempts for the localJobRunner.
|
||||||
|
*/
|
||||||
|
public void testLocalJobRunnerRetryCount() throws InterruptedException {
|
||||||
|
int retryAttempts = 3;
|
||||||
|
JobStatus jobStatus = createTestJobStatus(
|
||||||
|
"job_20130313155005308_0001", JobStatus.SUCCEEDED);
|
||||||
|
JobConf jobConf = createTestJobConf(
|
||||||
|
new Configuration(), retryAttempts, baseUrl + "fail");
|
||||||
|
JobEndNotifier.localRunnerNotification(jobConf, jobStatus);
|
||||||
|
|
||||||
|
// Validate params
|
||||||
|
assertEquals(retryAttempts + 1, FailServlet.calledTimes);
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Validate that the notification times out after reaching
|
||||||
|
* mapreduce.job.end-notification.timeout.
|
||||||
|
*/
|
||||||
|
public void testNotificationTimeout() throws InterruptedException {
|
||||||
|
Configuration conf = new Configuration();
|
||||||
|
// Reduce the timeout to 1 second
|
||||||
|
conf.setInt("mapreduce.job.end-notification.timeout", 1000);
|
||||||
|
|
||||||
|
JobStatus jobStatus = createTestJobStatus(
|
||||||
|
"job_20130313155005308_0001", JobStatus.SUCCEEDED);
|
||||||
|
JobConf jobConf = createTestJobConf(
|
||||||
|
conf, 0,
|
||||||
|
baseUrl + "delay");
|
||||||
|
long startTime = System.currentTimeMillis();
|
||||||
|
JobEndNotifier.localRunnerNotification(jobConf, jobStatus);
|
||||||
|
long elapsedTime = System.currentTimeMillis() - startTime;
|
||||||
|
|
||||||
|
// Validate params
|
||||||
|
assertEquals(1, DelayServlet.calledTimes);
|
||||||
|
// Make sure we timed out with time slightly above 1 second
|
||||||
|
// (default timeout is in terms of minutes, so we'll catch the problem)
|
||||||
|
assertTrue(elapsedTime < 2000);
|
||||||
|
}
|
||||||
|
|
||||||
|
private static JobStatus createTestJobStatus(String jobId, int state) {
|
||||||
|
return new JobStatus(
|
||||||
|
JobID.forName(jobId), 0.5f, 0.0f,
|
||||||
|
state, "root", "TestJobEndNotifier", null, null);
|
||||||
|
}
|
||||||
|
|
||||||
|
private static JobConf createTestJobConf(
|
||||||
|
Configuration conf, int retryAttempts, String notificationUri) {
|
||||||
|
JobConf jobConf = new JobConf(conf);
|
||||||
|
jobConf.setInt("job.end.retry.attempts", retryAttempts);
|
||||||
|
jobConf.set("job.end.retry.interval", "0");
|
||||||
|
jobConf.setJobEndNotificationURI(notificationUri);
|
||||||
|
return jobConf;
|
||||||
|
}
|
||||||
|
}
|
|
@ -275,6 +275,8 @@ Release 2.0.5-beta - UNRELEASED
|
||||||
YARN-547. Fixed race conditions in public and private resource localization
|
YARN-547. Fixed race conditions in public and private resource localization
|
||||||
which used to cause duplicate downloads. (Omkar Vinit Joshi via vinodkv)
|
which used to cause duplicate downloads. (Omkar Vinit Joshi via vinodkv)
|
||||||
|
|
||||||
|
YARN-594. Update test and add comments in YARN-534 (Jian He via bikas)
|
||||||
|
|
||||||
Release 2.0.4-alpha - UNRELEASED
|
Release 2.0.4-alpha - UNRELEASED
|
||||||
|
|
||||||
INCOMPATIBLE CHANGES
|
INCOMPATIBLE CHANGES
|
||||||
|
|
|
@ -340,6 +340,8 @@ public class RMAppManager implements EventHandler<RMAppManagerEvent>,
|
||||||
} else {
|
} else {
|
||||||
maxAppAttempts = individualMaxAppAttempts;
|
maxAppAttempts = individualMaxAppAttempts;
|
||||||
}
|
}
|
||||||
|
// In work-preserve restart, if attemptCount == maxAttempts, the job still
|
||||||
|
// needs to be recovered because the last attempt may still be running.
|
||||||
if(appState.getAttemptCount() >= maxAppAttempts) {
|
if(appState.getAttemptCount() >= maxAppAttempts) {
|
||||||
LOG.info("Not recovering application " + appState.getAppId() +
|
LOG.info("Not recovering application " + appState.getAppId() +
|
||||||
" due to recovering attempt is beyond maxAppAttempt limit");
|
" due to recovering attempt is beyond maxAppAttempt limit");
|
||||||
|
|
|
@ -364,7 +364,6 @@ public class TestRMRestart {
|
||||||
Assert.assertNotNull(attemptState);
|
Assert.assertNotNull(attemptState);
|
||||||
Assert.assertEquals(BuilderUtils.newContainerId(attemptId1, 1),
|
Assert.assertEquals(BuilderUtils.newContainerId(attemptId1, 1),
|
||||||
attemptState.getMasterContainer().getId());
|
attemptState.getMasterContainer().getId());
|
||||||
rm1.stop();
|
|
||||||
|
|
||||||
// start new RM
|
// start new RM
|
||||||
MockRM rm2 = new MockRM(conf, memStore);
|
MockRM rm2 = new MockRM(conf, memStore);
|
||||||
|
@ -382,7 +381,12 @@ public class TestRMRestart {
|
||||||
Assert.assertNull(rm2.getRMContext().getRMApps()
|
Assert.assertNull(rm2.getRMContext().getRMApps()
|
||||||
.get(app1.getApplicationId()));
|
.get(app1.getApplicationId()));
|
||||||
|
|
||||||
|
// verify that app2 is stored, app1 is removed
|
||||||
|
Assert.assertNotNull(rmAppState.get(app2.getApplicationId()));
|
||||||
|
Assert.assertNull(rmAppState.get(app1.getApplicationId()));
|
||||||
|
|
||||||
// stop the RM
|
// stop the RM
|
||||||
|
rm1.stop();
|
||||||
rm2.stop();
|
rm2.stop();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue