YARN-2190. Added CPU and memory limit options to the default container executor for Windows containers. Contributed by Chuan Liu

This commit is contained in:
Jian He 2015-03-06 14:17:57 -08:00
parent 01bfe6f05b
commit 21101c01f2
12 changed files with 360 additions and 35 deletions

View File

@ -209,7 +209,8 @@ Requirements:
* Findbugs 1.3.9 (if running findbugs) * Findbugs 1.3.9 (if running findbugs)
* ProtocolBuffer 2.5.0 * ProtocolBuffer 2.5.0
* CMake 2.6 or newer * CMake 2.6 or newer
* Windows SDK or Visual Studio 2010 Professional * Windows SDK 7.1 or Visual Studio 2010 Professional
* Windows SDK 8.1 (if building CPU rate control for the container executor)
* zlib headers (if building native code bindings for zlib) * zlib headers (if building native code bindings for zlib)
* Internet connection for first build (to fetch all Maven and Hadoop dependencies) * Internet connection for first build (to fetch all Maven and Hadoop dependencies)
* Unix command-line tools from GnuWin32: sh, mkdir, rm, cp, tar, gzip. These * Unix command-line tools from GnuWin32: sh, mkdir, rm, cp, tar, gzip. These
@ -220,11 +221,15 @@ can be downloaded from http://git-scm.com/download/win.
If using Visual Studio, it must be Visual Studio 2010 Professional (not 2012). If using Visual Studio, it must be Visual Studio 2010 Professional (not 2012).
Do not use Visual Studio Express. It does not support compiling for 64-bit, Do not use Visual Studio Express. It does not support compiling for 64-bit,
which is problematic if running a 64-bit system. The Windows SDK is free to which is problematic if running a 64-bit system. The Windows SDK 7.1 is free to
download here: download here:
http://www.microsoft.com/en-us/download/details.aspx?id=8279 http://www.microsoft.com/en-us/download/details.aspx?id=8279
The Windows SDK 8.1 is available to download at:
http://msdn.microsoft.com/en-us/windows/bg162891.aspx
Cygwin is neither required nor supported. Cygwin is neither required nor supported.
---------------------------------------------------------------------------------- ----------------------------------------------------------------------------------

View File

@ -49,6 +49,31 @@ typedef enum TaskCommandOptionType
TaskProcessList TaskProcessList
} TaskCommandOption; } TaskCommandOption;
//----------------------------------------------------------------------------
// Function: GetLimit
//
// Description:
// Get the resource limit value in long type given the command line argument.
//
// Returns:
// TRUE: If successfully get the value
// FALSE: otherwise
static BOOL GetLimit(__in const wchar_t *str, __out long *value)
{
wchar_t *end = NULL;
if (str == NULL || value == NULL) return FALSE;
*value = wcstol(str, &end, 10);
if (end == NULL || *end != '\0')
{
*value = -1;
return FALSE;
}
else
{
return TRUE;
}
}
//---------------------------------------------------------------------------- //----------------------------------------------------------------------------
// Function: ParseCommandLine // Function: ParseCommandLine
// //
@ -61,7 +86,9 @@ typedef enum TaskCommandOptionType
// FALSE: otherwise // FALSE: otherwise
static BOOL ParseCommandLine(__in int argc, static BOOL ParseCommandLine(__in int argc,
__in_ecount(argc) wchar_t *argv[], __in_ecount(argc) wchar_t *argv[],
__out TaskCommandOption *command) __out TaskCommandOption *command,
__out_opt long *memory,
__out_opt long *vcore)
{ {
*command = TaskInvalid; *command = TaskInvalid;
@ -88,9 +115,44 @@ static BOOL ParseCommandLine(__in int argc,
} }
} }
if (argc == 4) { if (argc >= 4 && argc <= 8) {
if (wcscmp(argv[1], L"create") == 0) if (wcscmp(argv[1], L"create") == 0)
{ {
int i;
for (i = 2; i < argc - 3; i++)
{
if (wcscmp(argv[i], L"-c") == 0)
{
if (vcore != NULL && !GetLimit(argv[i + 1], vcore))
{
return FALSE;
}
else
{
i++;
continue;
}
}
else if (wcscmp(argv[i], L"-m") == 0)
{
if (memory != NULL && !GetLimit(argv[i + 1], memory))
{
return FALSE;
}
else
{
i++;
continue;
}
}
else
{
break;
}
}
if (argc - i != 2)
return FALSE;
*command = TaskCreate; *command = TaskCreate;
return TRUE; return TRUE;
} }
@ -573,7 +635,7 @@ done:
// ERROR_SUCCESS: On success // ERROR_SUCCESS: On success
// GetLastError: otherwise // GetLastError: otherwise
DWORD CreateTaskImpl(__in_opt HANDLE logonHandle, __in PCWSTR jobObjName,__in PCWSTR cmdLine, DWORD CreateTaskImpl(__in_opt HANDLE logonHandle, __in PCWSTR jobObjName,__in PCWSTR cmdLine,
__in LPCWSTR userName) __in LPCWSTR userName, __in long memory, __in long cpuRate)
{ {
DWORD dwErrorCode = ERROR_SUCCESS; DWORD dwErrorCode = ERROR_SUCCESS;
DWORD exitCode = EXIT_FAILURE; DWORD exitCode = EXIT_FAILURE;
@ -616,6 +678,12 @@ DWORD CreateTaskImpl(__in_opt HANDLE logonHandle, __in PCWSTR jobObjName,__in PC
return dwErrorCode; return dwErrorCode;
} }
jeli.BasicLimitInformation.LimitFlags = JOB_OBJECT_LIMIT_KILL_ON_JOB_CLOSE; jeli.BasicLimitInformation.LimitFlags = JOB_OBJECT_LIMIT_KILL_ON_JOB_CLOSE;
if (memory > 0)
{
jeli.BasicLimitInformation.LimitFlags |= JOB_OBJECT_LIMIT_JOB_MEMORY;
jeli.ProcessMemoryLimit = ((SIZE_T) memory) * 1024 * 1024;
jeli.JobMemoryLimit = ((SIZE_T) memory) * 1024 * 1024;
}
if(SetInformationJobObject(jobObject, if(SetInformationJobObject(jobObject,
JobObjectExtendedLimitInformation, JobObjectExtendedLimitInformation,
&jeli, &jeli,
@ -626,6 +694,24 @@ DWORD CreateTaskImpl(__in_opt HANDLE logonHandle, __in PCWSTR jobObjName,__in PC
CloseHandle(jobObject); CloseHandle(jobObject);
return dwErrorCode; return dwErrorCode;
} }
#ifdef NTDDI_WIN8
if (cpuRate > 0)
{
JOBOBJECT_CPU_RATE_CONTROL_INFORMATION jcrci = { 0 };
SYSTEM_INFO sysinfo;
GetSystemInfo(&sysinfo);
jcrci.ControlFlags = JOB_OBJECT_CPU_RATE_CONTROL_ENABLE |
JOB_OBJECT_CPU_RATE_CONTROL_HARD_CAP;
jcrci.CpuRate = min(10000, cpuRate);
if(SetInformationJobObject(jobObject, JobObjectCpuRateControlInformation,
&jcrci, sizeof(jcrci)) == 0)
{
dwErrorCode = GetLastError();
CloseHandle(jobObject);
return dwErrorCode;
}
}
#endif
if (logonHandle != NULL) { if (logonHandle != NULL) {
dwErrorCode = AddNodeManagerAndUserACEsToObject(jobObject, userName, JOB_OBJECT_ALL_ACCESS); dwErrorCode = AddNodeManagerAndUserACEsToObject(jobObject, userName, JOB_OBJECT_ALL_ACCESS);
@ -809,10 +895,10 @@ create_process_done:
// Returns: // Returns:
// ERROR_SUCCESS: On success // ERROR_SUCCESS: On success
// GetLastError: otherwise // GetLastError: otherwise
DWORD CreateTask(__in PCWSTR jobObjName,__in PWSTR cmdLine) DWORD CreateTask(__in PCWSTR jobObjName,__in PWSTR cmdLine, __in long memory, __in long cpuRate)
{ {
// call with null logon in order to create tasks utilizing the current logon // call with null logon in order to create tasks utilizing the current logon
return CreateTaskImpl( NULL, jobObjName, cmdLine, NULL); return CreateTaskImpl( NULL, jobObjName, cmdLine, NULL, memory, cpuRate);
} }
//---------------------------------------------------------------------------- //----------------------------------------------------------------------------
@ -893,7 +979,7 @@ DWORD CreateTaskAsUser(__in PCWSTR jobObjName,
goto done; goto done;
} }
err = CreateTaskImpl(logonHandle, jobObjName, cmdLine, user); err = CreateTaskImpl(logonHandle, jobObjName, cmdLine, user, -1, -1);
done: done:
if( profileIsLoaded ) { if( profileIsLoaded ) {
@ -1095,6 +1181,8 @@ int Task(__in int argc, __in_ecount(argc) wchar_t *argv[])
{ {
DWORD dwErrorCode = ERROR_SUCCESS; DWORD dwErrorCode = ERROR_SUCCESS;
TaskCommandOption command = TaskInvalid; TaskCommandOption command = TaskInvalid;
long memory = -1;
long cpuRate = -1;
wchar_t* cmdLine = NULL; wchar_t* cmdLine = NULL;
wchar_t buffer[16*1024] = L""; // 32K max command line wchar_t buffer[16*1024] = L""; // 32K max command line
size_t charCountBufferLeft = sizeof(buffer)/sizeof(wchar_t); size_t charCountBufferLeft = sizeof(buffer)/sizeof(wchar_t);
@ -1111,7 +1199,7 @@ int Task(__in int argc, __in_ecount(argc) wchar_t *argv[])
ARGC_COMMAND_ARGS ARGC_COMMAND_ARGS
}; };
if (!ParseCommandLine(argc, argv, &command)) { if (!ParseCommandLine(argc, argv, &command, &memory, &cpuRate)) {
dwErrorCode = ERROR_INVALID_COMMAND_LINE; dwErrorCode = ERROR_INVALID_COMMAND_LINE;
fwprintf(stderr, L"Incorrect command line arguments.\n\n"); fwprintf(stderr, L"Incorrect command line arguments.\n\n");
@ -1123,7 +1211,7 @@ int Task(__in int argc, __in_ecount(argc) wchar_t *argv[])
{ {
// Create the task jobobject // Create the task jobobject
// //
dwErrorCode = CreateTask(argv[2], argv[3]); dwErrorCode = CreateTask(argv[argc-2], argv[argc-1], memory, cpuRate);
if (dwErrorCode != ERROR_SUCCESS) if (dwErrorCode != ERROR_SUCCESS)
{ {
ReportErrorCode(L"CreateTask", dwErrorCode); ReportErrorCode(L"CreateTask", dwErrorCode);
@ -1238,18 +1326,30 @@ void TaskUsage()
// jobobject's are being used. // jobobject's are being used.
// ProcessTree.isSetsidSupported() // ProcessTree.isSetsidSupported()
fwprintf(stdout, L"\ fwprintf(stdout, L"\
Usage: task create [TASKNAME] [COMMAND_LINE] |\n\ Usage: task create [OPTOINS] [TASKNAME] [COMMAND_LINE]\n\
task createAsUser [TASKNAME] [USERNAME] [PIDFILE] [COMMAND_LINE] |\n\ Creates a new task job object with taskname and options to set CPU\n\
task isAlive [TASKNAME] |\n\ and memory limits on the job object\n\
task kill [TASKNAME]\n\ \n\
task processList [TASKNAME]\n\ OPTIONS: -c [cup rate] set the cpu rate limit on the job object.\n\
Creates a new task jobobject with taskname\n\ -m [memory] set the memory limit on the job object.\n\
Creates a new task jobobject with taskname as the user provided\n\ The cpu limit is an integral value of percentage * 100. The memory\n\
Checks if task jobobject is alive\n\ limit is an integral number of memory in MB. \n\
Kills task jobobject\n\ The limit will not be set if 0 or negative value is passed in as\n\
Prints to stdout a list of processes in the task\n\ parameter(s).\n\
along with their resource usage. One process per line\n\ \n\
and comma separated info per process\n\ task createAsUser [TASKNAME] [USERNAME] [PIDFILE] [COMMAND_LINE]\n\
ProcessId,VirtualMemoryCommitted(bytes),\n\ Creates a new task jobobject with taskname as the user provided\n\
WorkingSetSize(bytes),CpuTime(Millisec,Kernel+User)\n"); \n\
task isAlive [TASKNAME]\n\
Checks if task job object is alive\n\
\n\
task kill [TASKNAME]\n\
Kills task job object\n\
\n\
task processList [TASKNAME]\n\
Prints to stdout a list of processes in the task\n\
along with their resource usage. One process per line\n\
and comma separated info per process\n\
ProcessId,VirtualMemoryCommitted(bytes),\n\
WorkingSetSize(bytes),CpuTime(Millisec,Kernel+User)\n");
} }

View File

@ -0,0 +1,28 @@
<?xml version="1.0" encoding="utf-8"?>
<!--
Licensed to the Apache Software Foundation (ASF) under one or more
contributor license agreements. See the NOTICE file distributed with
this work for additional information regarding copyright ownership.
The ASF licenses this file to You under the Apache License, Version 2.0
(the "License"); you may not use this file except in compliance with
the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
-->
<Project ToolsVersion="4.0" xmlns="http://schemas.microsoft.com/developer/msbuild/2003">
<ImportGroup Label="PropertySheets" />
<PropertyGroup Label="UserMacros" />
<PropertyGroup>
<ExecutablePath>$(VCInstallDir)bin\x86_amd64;$(VCInstallDir)bin;$(WindowsSdkDir)bin\NETFX 4.0 Tools;$(MSBuildProgramFiles32)\Windows Kits\8.1\bin\x86;$(VSInstallDir)Common7\Tools\bin;$(VSInstallDir)Common7\tools;$(VSInstallDir)Common7\ide;$(MSBuildProgramFiles32)\HTML Help Workshop;$(FrameworkSDKDir)\bin;$(MSBuildToolsPath32);$(VSInstallDir);$(SystemRoot)\SysWow64;$(FxCopDir);$(PATH)</ExecutablePath>
<IncludePath>$(MSBuildProgramFiles32)\Windows Kits\8.1\Include\um;$(MSBuildProgramFiles32)\Windows Kits\8.1\Include\shared;$(VCInstallDir)include;$(VCInstallDir)atlmfc\include;$(FrameworkSDKDir)\include;</IncludePath>
<LibraryPath>$(VCInstallDir)lib\amd64;$(VCInstallDir)atlmfc\lib\amd64;$(MSBuildProgramFiles32)\Windows Kits\8.1\lib\win8\um\x64;$(MSBuildProgramFiles32)\Windows Kits\8.1\Lib\winv6.3\um\x64;$(FrameworkSDKDir)\lib\x64</LibraryPath>
<ExcludePath>$(VCInstallDir)include;$(VCInstallDir)atlmfc\include;$(MSBuildProgramFiles32)\Windows Kits\8.1\Include\um;$(MSBuildProgramFiles32)\Windows Kits\8.1\Include\shared;$(FrameworkSDKDir)\include;$(MSBuildToolsPath32);$(VCInstallDir)atlmfc\lib;$(VCInstallDir)lib;</ExcludePath>
</PropertyGroup>
<ItemDefinitionGroup />
</Project>

View File

@ -67,6 +67,9 @@
</PropertyGroup> </PropertyGroup>
<ImportGroup Label="ExtensionSettings"> <ImportGroup Label="ExtensionSettings">
</ImportGroup> </ImportGroup>
<ImportGroup Label="PropertySheets" Condition="exists('$(MSBuildProgramFiles32)\Windows Kits\8.1')">
<Import Project="win8sdk.props" />
</ImportGroup>
<ImportGroup Condition="'$(Configuration)|$(Platform)'=='Debug|x64'" Label="PropertySheets"> <ImportGroup Condition="'$(Configuration)|$(Platform)'=='Debug|x64'" Label="PropertySheets">
<Import Project="$(UserRootDir)\Microsoft.Cpp.$(Platform).user.props" Condition="exists('$(UserRootDir)\Microsoft.Cpp.$(Platform).user.props')" Label="LocalAppDataPlatform" /> <Import Project="$(UserRootDir)\Microsoft.Cpp.$(Platform).user.props" Condition="exists('$(UserRootDir)\Microsoft.Cpp.$(Platform).user.props')" Label="LocalAppDataPlatform" />
</ImportGroup> </ImportGroup>

View File

@ -547,4 +547,66 @@ public class TestWinUtils {
assertThat(outNumber, containsString(testNumber)); assertThat(outNumber, containsString(testNumber));
} }
@Test (timeout = 30000)
public void testTaskCreateWithLimits() throws IOException {
// Generate a unique job id
String jobId = String.format("%f", Math.random());
// Run a task without any options
String out = Shell.execCommand(Shell.WINUTILS, "task", "create",
"job" + jobId, "cmd /c echo job" + jobId);
assertTrue(out.trim().equals("job" + jobId));
// Run a task without any limits
jobId = String.format("%f", Math.random());
out = Shell.execCommand(Shell.WINUTILS, "task", "create", "-c", "-1", "-m",
"-1", "job" + jobId, "cmd /c echo job" + jobId);
assertTrue(out.trim().equals("job" + jobId));
// Run a task with limits (128MB should be enough for a cmd)
jobId = String.format("%f", Math.random());
out = Shell.execCommand(Shell.WINUTILS, "task", "create", "-c", "10000", "-m",
"128", "job" + jobId, "cmd /c echo job" + jobId);
assertTrue(out.trim().equals("job" + jobId));
// Run a task without enough memory
try {
jobId = String.format("%f", Math.random());
out = Shell.execCommand(Shell.WINUTILS, "task", "create", "-m", "128", "job"
+ jobId, "java -Xmx256m -version");
fail("Failed to get Shell.ExitCodeException with insufficient memory");
} catch (Shell.ExitCodeException ece) {
assertThat(ece.getExitCode(), is(1));
}
// Run tasks with wrong parameters
//
try {
jobId = String.format("%f", Math.random());
Shell.execCommand(Shell.WINUTILS, "task", "create", "-c", "-1", "-m",
"-1", "foo", "job" + jobId, "cmd /c echo job" + jobId);
fail("Failed to get Shell.ExitCodeException with bad parameters");
} catch (Shell.ExitCodeException ece) {
assertThat(ece.getExitCode(), is(1639));
}
try {
jobId = String.format("%f", Math.random());
Shell.execCommand(Shell.WINUTILS, "task", "create", "-c", "-m", "-1",
"job" + jobId, "cmd /c echo job" + jobId);
fail("Failed to get Shell.ExitCodeException with bad parameters");
} catch (Shell.ExitCodeException ece) {
assertThat(ece.getExitCode(), is(1639));
}
try {
jobId = String.format("%f", Math.random());
Shell.execCommand(Shell.WINUTILS, "task", "create", "-c", "foo",
"job" + jobId, "cmd /c echo job" + jobId);
fail("Failed to get Shell.ExitCodeException with bad parameters");
} catch (Shell.ExitCodeException ece) {
assertThat(ece.getExitCode(), is(1639));
}
}
} }

View File

@ -363,6 +363,9 @@ Release 2.7.0 - UNRELEASED
YARN-1809. Synchronize RM and TimeLineServer Web-UIs. (Zhijie Shen and YARN-1809. Synchronize RM and TimeLineServer Web-UIs. (Zhijie Shen and
Xuan Gong via jianhe) Xuan Gong via jianhe)
YARN-2190. Added CPU and memory limit options to the default container
executor for Windows containers. (Chuan Liu via jianhe)
OPTIMIZATIONS OPTIMIZATIONS
YARN-2990. FairScheduler's delay-scheduling always waits for node-local and YARN-2990. FairScheduler's delay-scheduling always waits for node-local and

View File

@ -1027,6 +1027,18 @@ public class YarnConfiguration extends Configuration {
public static final long DEFAULT_NM_LINUX_CONTAINER_CGROUPS_DELETE_DELAY = public static final long DEFAULT_NM_LINUX_CONTAINER_CGROUPS_DELETE_DELAY =
20; 20;
/**
* Indicates if memory and CPU limits will be set for the Windows Job
* Object for the containers launched by the default container executor.
*/
public static final String NM_WINDOWS_CONTAINER_MEMORY_LIMIT_ENABLED =
NM_PREFIX + "windows-container.memory-limit.enabled";
public static final boolean DEFAULT_NM_WINDOWS_CONTAINER_MEMORY_LIMIT_ENABLED = false;
public static final String NM_WINDOWS_CONTAINER_CPU_LIMIT_ENABLED =
NM_PREFIX + "windows-container.cpu-limit.enabled";
public static final boolean DEFAULT_NM_WINDOWS_CONTAINER_CPU_LIMIT_ENABLED = false;
/** /**
/* The Windows group that the windows-secure-container-executor should run as. /* The Windows group that the windows-secure-container-executor should run as.
*/ */

View File

@ -1074,6 +1074,20 @@
<value>false</value> <value>false</value>
</property> </property>
<property>
<description>This flag determines whether memory limit will be set for the Windows Job
Object of the containers launched by the default container executor.</description>
<name>yarn.nodemanager.windows-container.memory-limit.enabled</name>
<value>false</value>
</property>
<property>
<description>This flag determines whether CPU limit will be set for the Windows Job
Object of the containers launched by the default container executor.</description>
<name>yarn.nodemanager.windows-container.cpu-limit.enabled</name>
<value>false</value>
</property>
<property> <property>
<description>T-file compression types used to compress aggregated logs.</description> <description>T-file compression types used to compress aggregated logs.</description>
<name>yarn.nodemanager.log-aggregation.compression-type</name> <name>yarn.nodemanager.log-aggregation.compression-type</name>

View File

@ -41,6 +41,7 @@ import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.permission.FsPermission; import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.yarn.api.records.ContainerId; import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container; import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerDiagnosticsUpdateEvent; import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerDiagnosticsUpdateEvent;
@ -298,6 +299,11 @@ public abstract class ContainerExecutor implements Configurable {
readLock.unlock(); readLock.unlock();
} }
} }
protected String[] getRunCommand(String command, String groupId,
String userName, Path pidFile, Configuration conf) {
return getRunCommand(command, groupId, userName, pidFile, conf, null);
}
/** /**
* Return a command to execute the given command in OS shell. * Return a command to execute the given command in OS shell.
@ -306,7 +312,7 @@ public abstract class ContainerExecutor implements Configurable {
* non-Windows, groupId is ignored. * non-Windows, groupId is ignored.
*/ */
protected String[] getRunCommand(String command, String groupId, protected String[] getRunCommand(String command, String groupId,
String userName, Path pidFile, Configuration conf) { String userName, Path pidFile, Configuration conf, Resource resource) {
boolean containerSchedPriorityIsSet = false; boolean containerSchedPriorityIsSet = false;
int containerSchedPriorityAdjustment = int containerSchedPriorityAdjustment =
YarnConfiguration.DEFAULT_NM_CONTAINER_EXECUTOR_SCHED_PRIORITY; YarnConfiguration.DEFAULT_NM_CONTAINER_EXECUTOR_SCHED_PRIORITY;
@ -320,7 +326,46 @@ public abstract class ContainerExecutor implements Configurable {
} }
if (Shell.WINDOWS) { if (Shell.WINDOWS) {
return new String[] { Shell.WINUTILS, "task", "create", groupId, int cpuRate = -1;
int memory = -1;
if (resource != null) {
if (conf
.getBoolean(
YarnConfiguration.NM_WINDOWS_CONTAINER_MEMORY_LIMIT_ENABLED,
YarnConfiguration.DEFAULT_NM_WINDOWS_CONTAINER_MEMORY_LIMIT_ENABLED)) {
memory = resource.getMemory();
}
if (conf.getBoolean(
YarnConfiguration.NM_WINDOWS_CONTAINER_CPU_LIMIT_ENABLED,
YarnConfiguration.DEFAULT_NM_WINDOWS_CONTAINER_CPU_LIMIT_ENABLED)) {
int containerVCores = resource.getVirtualCores();
int nodeVCores = conf.getInt(YarnConfiguration.NM_VCORES,
YarnConfiguration.DEFAULT_NM_VCORES);
// cap overall usage to the number of cores allocated to YARN
int nodeCpuPercentage = Math
.min(
conf.getInt(
YarnConfiguration.NM_RESOURCE_PERCENTAGE_PHYSICAL_CPU_LIMIT,
YarnConfiguration.DEFAULT_NM_RESOURCE_PERCENTAGE_PHYSICAL_CPU_LIMIT),
100);
nodeCpuPercentage = Math.max(0, nodeCpuPercentage);
if (nodeCpuPercentage == 0) {
String message = "Illegal value for "
+ YarnConfiguration.NM_RESOURCE_PERCENTAGE_PHYSICAL_CPU_LIMIT
+ ". Value cannot be less than or equal to 0.";
throw new IllegalArgumentException(message);
}
float yarnVCores = (nodeCpuPercentage * nodeVCores) / 100.0f;
// CPU should be set to a percentage * 100, e.g. 20% cpu rate limit
// should be set as 20 * 100. The following setting is equal to:
// 100 * (100 * (vcores / Total # of cores allocated to YARN))
cpuRate = Math.min(10000,
(int) ((containerVCores * 10000) / yarnVCores));
}
}
return new String[] { Shell.WINUTILS, "task", "create", "-m",
String.valueOf(memory), "-c", String.valueOf(cpuRate), groupId,
"cmd /c " + command }; "cmd /c " + command };
} else { } else {
List<String> retCommand = new ArrayList<String>(); List<String> retCommand = new ArrayList<String>();

View File

@ -48,6 +48,7 @@ import org.apache.hadoop.util.Shell.CommandExecutor;
import org.apache.hadoop.util.Shell.ShellCommandExecutor; import org.apache.hadoop.util.Shell.ShellCommandExecutor;
import org.apache.hadoop.util.StringUtils; import org.apache.hadoop.util.StringUtils;
import org.apache.hadoop.yarn.api.records.ContainerId; import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider; import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container; import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container;
@ -202,7 +203,7 @@ public class DefaultContainerExecutor extends ContainerExecutor {
setScriptExecutable(sb.getWrapperScriptPath(), user); setScriptExecutable(sb.getWrapperScriptPath(), user);
shExec = buildCommandExecutor(sb.getWrapperScriptPath().toString(), shExec = buildCommandExecutor(sb.getWrapperScriptPath().toString(),
containerIdStr, user, pidFile, containerIdStr, user, pidFile, container.getResource(),
new File(containerWorkDir.toUri().getPath()), new File(containerWorkDir.toUri().getPath()),
container.getLaunchContext().getEnvironment()); container.getLaunchContext().getEnvironment());
@ -256,12 +257,12 @@ public class DefaultContainerExecutor extends ContainerExecutor {
} }
protected CommandExecutor buildCommandExecutor(String wrapperScriptPath, protected CommandExecutor buildCommandExecutor(String wrapperScriptPath,
String containerIdStr, String user, Path pidFile, File wordDir, String containerIdStr, String user, Path pidFile, Resource resource,
Map<String, String> environment) File wordDir, Map<String, String> environment)
throws IOException { throws IOException {
String[] command = getRunCommand(wrapperScriptPath, String[] command = getRunCommand(wrapperScriptPath,
containerIdStr, user, pidFile, this.getConf()); containerIdStr, user, pidFile, this.getConf(), resource);
LOG.info("launchContainer: " + Arrays.toString(command)); LOG.info("launchContainer: " + Arrays.toString(command));
return new ShellCommandExecutor( return new ShellCommandExecutor(

View File

@ -52,6 +52,7 @@ import org.apache.hadoop.io.nativeio.NativeIOException;
import org.apache.hadoop.util.NativeCodeLoader; import org.apache.hadoop.util.NativeCodeLoader;
import org.apache.hadoop.util.Shell; import org.apache.hadoop.util.Shell;
import org.apache.hadoop.util.Shell.CommandExecutor; import org.apache.hadoop.util.Shell.CommandExecutor;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.ContainerLocalizer; import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.ContainerLocalizer;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.ResourceLocalizationService; import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.ResourceLocalizationService;
@ -727,11 +728,9 @@ public class WindowsSecureContainerExecutor extends DefaultContainerExecutor {
} }
@Override @Override
protected CommandExecutor buildCommandExecutor(String wrapperScriptPath, protected CommandExecutor buildCommandExecutor(String wrapperScriptPath,
String containerIdStr, String containerIdStr, String userName, Path pidFile, Resource resource,
String userName, Path pidFile,File wordDir, Map<String, String> environment) File wordDir, Map<String, String> environment) throws IOException {
throws IOException {
return new WintuilsProcessStubExecutor( return new WintuilsProcessStubExecutor(
wordDir.toString(), wordDir.toString(),
containerIdStr, userName, pidFile.toString(), containerIdStr, userName, pidFile.toString(),

View File

@ -18,13 +18,21 @@
package org.apache.hadoop.yarn.server.nodemanager; package org.apache.hadoop.yarn.server.nodemanager;
import java.util.Arrays;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.util.Shell; import org.apache.hadoop.util.Shell;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.server.nodemanager.util.NodeManagerHardwareUtils;
import org.apache.hadoop.yarn.util.ResourceCalculatorPlugin;
import org.junit.Assert;
import org.junit.Test; import org.junit.Test;
import static org.junit.Assert.*; import static org.junit.Assert.*;
import static org.junit.Assume.assumeTrue;
public class TestContainerExecutor { public class TestContainerExecutor {
@ -69,4 +77,49 @@ public class TestContainerExecutor {
} }
} }
@Test (timeout = 5000)
public void testRunCommandWithNoResources() {
// Windows only test
assumeTrue(Shell.WINDOWS);
Configuration conf = new Configuration();
String[] command = containerExecutor.getRunCommand("echo", "group1", null, null,
conf, Resource.newInstance(1024, 1));
// Assert the cpu and memory limits are set correctly in the command
String[] expected = { Shell.WINUTILS, "task", "create", "-m", "-1", "-c",
"-1", "group1", "cmd /c " + "echo" };
Assert.assertTrue(Arrays.equals(expected, command));
}
@Test (timeout = 5000)
public void testRunCommandWithMemoryOnlyResources() {
// Windows only test
assumeTrue(Shell.WINDOWS);
Configuration conf = new Configuration();
conf.set(YarnConfiguration.NM_WINDOWS_CONTAINER_MEMORY_LIMIT_ENABLED, "true");
String[] command = containerExecutor.getRunCommand("echo", "group1", null, null,
conf, Resource.newInstance(1024, 1));
// Assert the cpu and memory limits are set correctly in the command
String[] expected = { Shell.WINUTILS, "task", "create", "-m", "1024", "-c",
"-1", "group1", "cmd /c " + "echo" };
Assert.assertTrue(Arrays.equals(expected, command));
}
@Test (timeout = 5000)
public void testRunCommandWithCpuAndMemoryResources() {
// Windows only test
assumeTrue(Shell.WINDOWS);
Configuration conf = new Configuration();
conf.set(YarnConfiguration.NM_WINDOWS_CONTAINER_CPU_LIMIT_ENABLED, "true");
conf.set(YarnConfiguration.NM_WINDOWS_CONTAINER_MEMORY_LIMIT_ENABLED, "true");
String[] command = containerExecutor.getRunCommand("echo", "group1", null, null,
conf, Resource.newInstance(1024, 1));
float yarnProcessors = NodeManagerHardwareUtils.getContainersCores(
ResourceCalculatorPlugin.getResourceCalculatorPlugin(null, conf),
conf);
int cpuRate = Math.min(10000, (int) ((1 * 10000) / yarnProcessors));
// Assert the cpu and memory limits are set correctly in the command
String[] expected = { Shell.WINUTILS, "task", "create", "-m", "1024", "-c",
String.valueOf(cpuRate), "group1", "cmd /c " + "echo" };
Assert.assertTrue(Arrays.equals(expected, command));
}
} }