diff --git a/BUILDING.txt b/BUILDING.txt
index 6e38ad374a1..b60da6c4163 100644
--- a/BUILDING.txt
+++ b/BUILDING.txt
@@ -209,7 +209,8 @@ Requirements:
* Findbugs 1.3.9 (if running findbugs)
* ProtocolBuffer 2.5.0
* CMake 2.6 or newer
-* Windows SDK or Visual Studio 2010 Professional
+* Windows SDK 7.1 or Visual Studio 2010 Professional
+* Windows SDK 8.1 (if building CPU rate control for the container executor)
* zlib headers (if building native code bindings for zlib)
* Internet connection for first build (to fetch all Maven and Hadoop dependencies)
* Unix command-line tools from GnuWin32: sh, mkdir, rm, cp, tar, gzip. These
@@ -220,11 +221,15 @@ can be downloaded from http://git-scm.com/download/win.
If using Visual Studio, it must be Visual Studio 2010 Professional (not 2012).
Do not use Visual Studio Express. It does not support compiling for 64-bit,
-which is problematic if running a 64-bit system. The Windows SDK is free to
+which is problematic if running a 64-bit system. The Windows SDK 7.1 is free to
download here:
http://www.microsoft.com/en-us/download/details.aspx?id=8279
+The Windows SDK 8.1 is available to download at:
+
+http://msdn.microsoft.com/en-us/windows/bg162891.aspx
+
Cygwin is neither required nor supported.
----------------------------------------------------------------------------------
diff --git a/hadoop-common-project/hadoop-common/src/main/winutils/task.c b/hadoop-common-project/hadoop-common/src/main/winutils/task.c
index 21b18939533..37c6ca136a8 100644
--- a/hadoop-common-project/hadoop-common/src/main/winutils/task.c
+++ b/hadoop-common-project/hadoop-common/src/main/winutils/task.c
@@ -49,6 +49,31 @@ typedef enum TaskCommandOptionType
TaskProcessList
} TaskCommandOption;
+ //----------------------------------------------------------------------------
+// Function: GetLimit
+//
+// Description:
+// Get the resource limit value in long type given the command line argument.
+//
+// Returns:
+// TRUE: If successfully get the value
+// FALSE: otherwise
+static BOOL GetLimit(__in const wchar_t *str, __out long *value)
+{
+ wchar_t *end = NULL;
+ if (str == NULL || value == NULL) return FALSE;
+ *value = wcstol(str, &end, 10);
+ if (end == NULL || *end != '\0')
+ {
+ *value = -1;
+ return FALSE;
+ }
+ else
+ {
+ return TRUE;
+ }
+}
+
//----------------------------------------------------------------------------
// Function: ParseCommandLine
//
@@ -61,7 +86,9 @@ typedef enum TaskCommandOptionType
// FALSE: otherwise
static BOOL ParseCommandLine(__in int argc,
__in_ecount(argc) wchar_t *argv[],
- __out TaskCommandOption *command)
+ __out TaskCommandOption *command,
+ __out_opt long *memory,
+ __out_opt long *vcore)
{
*command = TaskInvalid;
@@ -88,9 +115,44 @@ static BOOL ParseCommandLine(__in int argc,
}
}
- if (argc == 4) {
+ if (argc >= 4 && argc <= 8) {
if (wcscmp(argv[1], L"create") == 0)
{
+ int i;
+ for (i = 2; i < argc - 3; i++)
+ {
+ if (wcscmp(argv[i], L"-c") == 0)
+ {
+ if (vcore != NULL && !GetLimit(argv[i + 1], vcore))
+ {
+ return FALSE;
+ }
+ else
+ {
+ i++;
+ continue;
+ }
+ }
+ else if (wcscmp(argv[i], L"-m") == 0)
+ {
+ if (memory != NULL && !GetLimit(argv[i + 1], memory))
+ {
+ return FALSE;
+ }
+ else
+ {
+ i++;
+ continue;
+ }
+ }
+ else
+ {
+ break;
+ }
+ }
+ if (argc - i != 2)
+ return FALSE;
+
*command = TaskCreate;
return TRUE;
}
@@ -573,7 +635,7 @@ done:
// ERROR_SUCCESS: On success
// GetLastError: otherwise
DWORD CreateTaskImpl(__in_opt HANDLE logonHandle, __in PCWSTR jobObjName,__in PCWSTR cmdLine,
- __in LPCWSTR userName)
+ __in LPCWSTR userName, __in long memory, __in long cpuRate)
{
DWORD dwErrorCode = ERROR_SUCCESS;
DWORD exitCode = EXIT_FAILURE;
@@ -616,6 +678,12 @@ DWORD CreateTaskImpl(__in_opt HANDLE logonHandle, __in PCWSTR jobObjName,__in PC
return dwErrorCode;
}
jeli.BasicLimitInformation.LimitFlags = JOB_OBJECT_LIMIT_KILL_ON_JOB_CLOSE;
+ if (memory > 0)
+ {
+ jeli.BasicLimitInformation.LimitFlags |= JOB_OBJECT_LIMIT_JOB_MEMORY;
+ jeli.ProcessMemoryLimit = ((SIZE_T) memory) * 1024 * 1024;
+ jeli.JobMemoryLimit = ((SIZE_T) memory) * 1024 * 1024;
+ }
if(SetInformationJobObject(jobObject,
JobObjectExtendedLimitInformation,
&jeli,
@@ -626,6 +694,24 @@ DWORD CreateTaskImpl(__in_opt HANDLE logonHandle, __in PCWSTR jobObjName,__in PC
CloseHandle(jobObject);
return dwErrorCode;
}
+#ifdef NTDDI_WIN8
+ if (cpuRate > 0)
+ {
+ JOBOBJECT_CPU_RATE_CONTROL_INFORMATION jcrci = { 0 };
+ SYSTEM_INFO sysinfo;
+ GetSystemInfo(&sysinfo);
+ jcrci.ControlFlags = JOB_OBJECT_CPU_RATE_CONTROL_ENABLE |
+ JOB_OBJECT_CPU_RATE_CONTROL_HARD_CAP;
+ jcrci.CpuRate = min(10000, cpuRate);
+ if(SetInformationJobObject(jobObject, JobObjectCpuRateControlInformation,
+ &jcrci, sizeof(jcrci)) == 0)
+ {
+ dwErrorCode = GetLastError();
+ CloseHandle(jobObject);
+ return dwErrorCode;
+ }
+ }
+#endif
if (logonHandle != NULL) {
dwErrorCode = AddNodeManagerAndUserACEsToObject(jobObject, userName, JOB_OBJECT_ALL_ACCESS);
@@ -809,10 +895,10 @@ create_process_done:
// Returns:
// ERROR_SUCCESS: On success
// GetLastError: otherwise
-DWORD CreateTask(__in PCWSTR jobObjName,__in PWSTR cmdLine)
+DWORD CreateTask(__in PCWSTR jobObjName,__in PWSTR cmdLine, __in long memory, __in long cpuRate)
{
// call with null logon in order to create tasks utilizing the current logon
- return CreateTaskImpl( NULL, jobObjName, cmdLine, NULL);
+ return CreateTaskImpl( NULL, jobObjName, cmdLine, NULL, memory, cpuRate);
}
//----------------------------------------------------------------------------
@@ -893,7 +979,7 @@ DWORD CreateTaskAsUser(__in PCWSTR jobObjName,
goto done;
}
- err = CreateTaskImpl(logonHandle, jobObjName, cmdLine, user);
+ err = CreateTaskImpl(logonHandle, jobObjName, cmdLine, user, -1, -1);
done:
if( profileIsLoaded ) {
@@ -1095,6 +1181,8 @@ int Task(__in int argc, __in_ecount(argc) wchar_t *argv[])
{
DWORD dwErrorCode = ERROR_SUCCESS;
TaskCommandOption command = TaskInvalid;
+ long memory = -1;
+ long cpuRate = -1;
wchar_t* cmdLine = NULL;
wchar_t buffer[16*1024] = L""; // 32K max command line
size_t charCountBufferLeft = sizeof(buffer)/sizeof(wchar_t);
@@ -1111,7 +1199,7 @@ int Task(__in int argc, __in_ecount(argc) wchar_t *argv[])
ARGC_COMMAND_ARGS
};
- if (!ParseCommandLine(argc, argv, &command)) {
+ if (!ParseCommandLine(argc, argv, &command, &memory, &cpuRate)) {
dwErrorCode = ERROR_INVALID_COMMAND_LINE;
fwprintf(stderr, L"Incorrect command line arguments.\n\n");
@@ -1123,7 +1211,7 @@ int Task(__in int argc, __in_ecount(argc) wchar_t *argv[])
{
// Create the task jobobject
//
- dwErrorCode = CreateTask(argv[2], argv[3]);
+ dwErrorCode = CreateTask(argv[argc-2], argv[argc-1], memory, cpuRate);
if (dwErrorCode != ERROR_SUCCESS)
{
ReportErrorCode(L"CreateTask", dwErrorCode);
@@ -1238,18 +1326,30 @@ void TaskUsage()
// jobobject's are being used.
// ProcessTree.isSetsidSupported()
fwprintf(stdout, L"\
- Usage: task create [TASKNAME] [COMMAND_LINE] |\n\
- task createAsUser [TASKNAME] [USERNAME] [PIDFILE] [COMMAND_LINE] |\n\
- task isAlive [TASKNAME] |\n\
- task kill [TASKNAME]\n\
- task processList [TASKNAME]\n\
- Creates a new task jobobject with taskname\n\
- Creates a new task jobobject with taskname as the user provided\n\
- Checks if task jobobject is alive\n\
- Kills task jobobject\n\
- Prints to stdout a list of processes in the task\n\
- along with their resource usage. One process per line\n\
- and comma separated info per process\n\
- ProcessId,VirtualMemoryCommitted(bytes),\n\
- WorkingSetSize(bytes),CpuTime(Millisec,Kernel+User)\n");
+Usage: task create [OPTOINS] [TASKNAME] [COMMAND_LINE]\n\
+ Creates a new task job object with taskname and options to set CPU\n\
+ and memory limits on the job object\n\
+\n\
+ OPTIONS: -c [cup rate] set the cpu rate limit on the job object.\n\
+ -m [memory] set the memory limit on the job object.\n\
+ The cpu limit is an integral value of percentage * 100. The memory\n\
+ limit is an integral number of memory in MB. \n\
+ The limit will not be set if 0 or negative value is passed in as\n\
+ parameter(s).\n\
+\n\
+ task createAsUser [TASKNAME] [USERNAME] [PIDFILE] [COMMAND_LINE]\n\
+ Creates a new task jobobject with taskname as the user provided\n\
+\n\
+ task isAlive [TASKNAME]\n\
+ Checks if task job object is alive\n\
+\n\
+ task kill [TASKNAME]\n\
+ Kills task job object\n\
+\n\
+ task processList [TASKNAME]\n\
+ Prints to stdout a list of processes in the task\n\
+ along with their resource usage. One process per line\n\
+ and comma separated info per process\n\
+ ProcessId,VirtualMemoryCommitted(bytes),\n\
+ WorkingSetSize(bytes),CpuTime(Millisec,Kernel+User)\n");
}
diff --git a/hadoop-common-project/hadoop-common/src/main/winutils/win8sdk.props b/hadoop-common-project/hadoop-common/src/main/winutils/win8sdk.props
new file mode 100644
index 00000000000..503b37ad5c5
--- /dev/null
+++ b/hadoop-common-project/hadoop-common/src/main/winutils/win8sdk.props
@@ -0,0 +1,28 @@
+
+
+
+
+
+
+ $(VCInstallDir)bin\x86_amd64;$(VCInstallDir)bin;$(WindowsSdkDir)bin\NETFX 4.0 Tools;$(MSBuildProgramFiles32)\Windows Kits\8.1\bin\x86;$(VSInstallDir)Common7\Tools\bin;$(VSInstallDir)Common7\tools;$(VSInstallDir)Common7\ide;$(MSBuildProgramFiles32)\HTML Help Workshop;$(FrameworkSDKDir)\bin;$(MSBuildToolsPath32);$(VSInstallDir);$(SystemRoot)\SysWow64;$(FxCopDir);$(PATH)
+ $(MSBuildProgramFiles32)\Windows Kits\8.1\Include\um;$(MSBuildProgramFiles32)\Windows Kits\8.1\Include\shared;$(VCInstallDir)include;$(VCInstallDir)atlmfc\include;$(FrameworkSDKDir)\include;
+ $(VCInstallDir)lib\amd64;$(VCInstallDir)atlmfc\lib\amd64;$(MSBuildProgramFiles32)\Windows Kits\8.1\lib\win8\um\x64;$(MSBuildProgramFiles32)\Windows Kits\8.1\Lib\winv6.3\um\x64;$(FrameworkSDKDir)\lib\x64
+ $(VCInstallDir)include;$(VCInstallDir)atlmfc\include;$(MSBuildProgramFiles32)\Windows Kits\8.1\Include\um;$(MSBuildProgramFiles32)\Windows Kits\8.1\Include\shared;$(FrameworkSDKDir)\include;$(MSBuildToolsPath32);$(VCInstallDir)atlmfc\lib;$(VCInstallDir)lib;
+
+
+
diff --git a/hadoop-common-project/hadoop-common/src/main/winutils/winutils.vcxproj b/hadoop-common-project/hadoop-common/src/main/winutils/winutils.vcxproj
index 9ecba0a87a7..76a74147e1f 100644
--- a/hadoop-common-project/hadoop-common/src/main/winutils/winutils.vcxproj
+++ b/hadoop-common-project/hadoop-common/src/main/winutils/winutils.vcxproj
@@ -67,6 +67,9 @@
+
+
+
diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/util/TestWinUtils.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/util/TestWinUtils.java
index 8ac6e40a045..987c7068a82 100644
--- a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/util/TestWinUtils.java
+++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/util/TestWinUtils.java
@@ -547,4 +547,66 @@ public class TestWinUtils {
assertThat(outNumber, containsString(testNumber));
}
+
+ @Test (timeout = 30000)
+ public void testTaskCreateWithLimits() throws IOException {
+ // Generate a unique job id
+ String jobId = String.format("%f", Math.random());
+
+ // Run a task without any options
+ String out = Shell.execCommand(Shell.WINUTILS, "task", "create",
+ "job" + jobId, "cmd /c echo job" + jobId);
+ assertTrue(out.trim().equals("job" + jobId));
+
+ // Run a task without any limits
+ jobId = String.format("%f", Math.random());
+ out = Shell.execCommand(Shell.WINUTILS, "task", "create", "-c", "-1", "-m",
+ "-1", "job" + jobId, "cmd /c echo job" + jobId);
+ assertTrue(out.trim().equals("job" + jobId));
+
+ // Run a task with limits (128MB should be enough for a cmd)
+ jobId = String.format("%f", Math.random());
+ out = Shell.execCommand(Shell.WINUTILS, "task", "create", "-c", "10000", "-m",
+ "128", "job" + jobId, "cmd /c echo job" + jobId);
+ assertTrue(out.trim().equals("job" + jobId));
+
+ // Run a task without enough memory
+ try {
+ jobId = String.format("%f", Math.random());
+ out = Shell.execCommand(Shell.WINUTILS, "task", "create", "-m", "128", "job"
+ + jobId, "java -Xmx256m -version");
+ fail("Failed to get Shell.ExitCodeException with insufficient memory");
+ } catch (Shell.ExitCodeException ece) {
+ assertThat(ece.getExitCode(), is(1));
+ }
+
+ // Run tasks with wrong parameters
+ //
+ try {
+ jobId = String.format("%f", Math.random());
+ Shell.execCommand(Shell.WINUTILS, "task", "create", "-c", "-1", "-m",
+ "-1", "foo", "job" + jobId, "cmd /c echo job" + jobId);
+ fail("Failed to get Shell.ExitCodeException with bad parameters");
+ } catch (Shell.ExitCodeException ece) {
+ assertThat(ece.getExitCode(), is(1639));
+ }
+
+ try {
+ jobId = String.format("%f", Math.random());
+ Shell.execCommand(Shell.WINUTILS, "task", "create", "-c", "-m", "-1",
+ "job" + jobId, "cmd /c echo job" + jobId);
+ fail("Failed to get Shell.ExitCodeException with bad parameters");
+ } catch (Shell.ExitCodeException ece) {
+ assertThat(ece.getExitCode(), is(1639));
+ }
+
+ try {
+ jobId = String.format("%f", Math.random());
+ Shell.execCommand(Shell.WINUTILS, "task", "create", "-c", "foo",
+ "job" + jobId, "cmd /c echo job" + jobId);
+ fail("Failed to get Shell.ExitCodeException with bad parameters");
+ } catch (Shell.ExitCodeException ece) {
+ assertThat(ece.getExitCode(), is(1639));
+ }
+ }
}
diff --git a/hadoop-yarn-project/CHANGES.txt b/hadoop-yarn-project/CHANGES.txt
index d073169d384..c2aa2eff40f 100644
--- a/hadoop-yarn-project/CHANGES.txt
+++ b/hadoop-yarn-project/CHANGES.txt
@@ -363,6 +363,9 @@ Release 2.7.0 - UNRELEASED
YARN-1809. Synchronize RM and TimeLineServer Web-UIs. (Zhijie Shen and
Xuan Gong via jianhe)
+ YARN-2190. Added CPU and memory limit options to the default container
+ executor for Windows containers. (Chuan Liu via jianhe)
+
OPTIMIZATIONS
YARN-2990. FairScheduler's delay-scheduling always waits for node-local and
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
index 25b808e782e..8c83fea9cb7 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
@@ -1027,6 +1027,18 @@ public class YarnConfiguration extends Configuration {
public static final long DEFAULT_NM_LINUX_CONTAINER_CGROUPS_DELETE_DELAY =
20;
+ /**
+ * Indicates if memory and CPU limits will be set for the Windows Job
+ * Object for the containers launched by the default container executor.
+ */
+ public static final String NM_WINDOWS_CONTAINER_MEMORY_LIMIT_ENABLED =
+ NM_PREFIX + "windows-container.memory-limit.enabled";
+ public static final boolean DEFAULT_NM_WINDOWS_CONTAINER_MEMORY_LIMIT_ENABLED = false;
+
+ public static final String NM_WINDOWS_CONTAINER_CPU_LIMIT_ENABLED =
+ NM_PREFIX + "windows-container.cpu-limit.enabled";
+ public static final boolean DEFAULT_NM_WINDOWS_CONTAINER_CPU_LIMIT_ENABLED = false;
+
/**
/* The Windows group that the windows-secure-container-executor should run as.
*/
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml
index df730d565c5..66400c8831f 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml
@@ -1074,6 +1074,20 @@
false
+
+ This flag determines whether memory limit will be set for the Windows Job
+ Object of the containers launched by the default container executor.
+ yarn.nodemanager.windows-container.memory-limit.enabled
+ false
+
+
+
+ This flag determines whether CPU limit will be set for the Windows Job
+ Object of the containers launched by the default container executor.
+ yarn.nodemanager.windows-container.cpu-limit.enabled
+ false
+
+
T-file compression types used to compress aggregated logs.
yarn.nodemanager.log-aggregation.compression-type
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/ContainerExecutor.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/ContainerExecutor.java
index 77193df3280..248a393719c 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/ContainerExecutor.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/ContainerExecutor.java
@@ -41,6 +41,7 @@ import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerDiagnosticsUpdateEvent;
@@ -298,6 +299,11 @@ public abstract class ContainerExecutor implements Configurable {
readLock.unlock();
}
}
+
+ protected String[] getRunCommand(String command, String groupId,
+ String userName, Path pidFile, Configuration conf) {
+ return getRunCommand(command, groupId, userName, pidFile, conf, null);
+ }
/**
* Return a command to execute the given command in OS shell.
@@ -306,7 +312,7 @@ public abstract class ContainerExecutor implements Configurable {
* non-Windows, groupId is ignored.
*/
protected String[] getRunCommand(String command, String groupId,
- String userName, Path pidFile, Configuration conf) {
+ String userName, Path pidFile, Configuration conf, Resource resource) {
boolean containerSchedPriorityIsSet = false;
int containerSchedPriorityAdjustment =
YarnConfiguration.DEFAULT_NM_CONTAINER_EXECUTOR_SCHED_PRIORITY;
@@ -320,7 +326,46 @@ public abstract class ContainerExecutor implements Configurable {
}
if (Shell.WINDOWS) {
- return new String[] { Shell.WINUTILS, "task", "create", groupId,
+ int cpuRate = -1;
+ int memory = -1;
+ if (resource != null) {
+ if (conf
+ .getBoolean(
+ YarnConfiguration.NM_WINDOWS_CONTAINER_MEMORY_LIMIT_ENABLED,
+ YarnConfiguration.DEFAULT_NM_WINDOWS_CONTAINER_MEMORY_LIMIT_ENABLED)) {
+ memory = resource.getMemory();
+ }
+
+ if (conf.getBoolean(
+ YarnConfiguration.NM_WINDOWS_CONTAINER_CPU_LIMIT_ENABLED,
+ YarnConfiguration.DEFAULT_NM_WINDOWS_CONTAINER_CPU_LIMIT_ENABLED)) {
+ int containerVCores = resource.getVirtualCores();
+ int nodeVCores = conf.getInt(YarnConfiguration.NM_VCORES,
+ YarnConfiguration.DEFAULT_NM_VCORES);
+ // cap overall usage to the number of cores allocated to YARN
+ int nodeCpuPercentage = Math
+ .min(
+ conf.getInt(
+ YarnConfiguration.NM_RESOURCE_PERCENTAGE_PHYSICAL_CPU_LIMIT,
+ YarnConfiguration.DEFAULT_NM_RESOURCE_PERCENTAGE_PHYSICAL_CPU_LIMIT),
+ 100);
+ nodeCpuPercentage = Math.max(0, nodeCpuPercentage);
+ if (nodeCpuPercentage == 0) {
+ String message = "Illegal value for "
+ + YarnConfiguration.NM_RESOURCE_PERCENTAGE_PHYSICAL_CPU_LIMIT
+ + ". Value cannot be less than or equal to 0.";
+ throw new IllegalArgumentException(message);
+ }
+ float yarnVCores = (nodeCpuPercentage * nodeVCores) / 100.0f;
+ // CPU should be set to a percentage * 100, e.g. 20% cpu rate limit
+ // should be set as 20 * 100. The following setting is equal to:
+ // 100 * (100 * (vcores / Total # of cores allocated to YARN))
+ cpuRate = Math.min(10000,
+ (int) ((containerVCores * 10000) / yarnVCores));
+ }
+ }
+ return new String[] { Shell.WINUTILS, "task", "create", "-m",
+ String.valueOf(memory), "-c", String.valueOf(cpuRate), groupId,
"cmd /c " + command };
} else {
List retCommand = new ArrayList();
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/DefaultContainerExecutor.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/DefaultContainerExecutor.java
index f3d21210b85..e0ecea322eb 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/DefaultContainerExecutor.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/DefaultContainerExecutor.java
@@ -48,6 +48,7 @@ import org.apache.hadoop.util.Shell.CommandExecutor;
import org.apache.hadoop.util.Shell.ShellCommandExecutor;
import org.apache.hadoop.util.StringUtils;
import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container;
@@ -202,7 +203,7 @@ public class DefaultContainerExecutor extends ContainerExecutor {
setScriptExecutable(sb.getWrapperScriptPath(), user);
shExec = buildCommandExecutor(sb.getWrapperScriptPath().toString(),
- containerIdStr, user, pidFile,
+ containerIdStr, user, pidFile, container.getResource(),
new File(containerWorkDir.toUri().getPath()),
container.getLaunchContext().getEnvironment());
@@ -256,12 +257,12 @@ public class DefaultContainerExecutor extends ContainerExecutor {
}
protected CommandExecutor buildCommandExecutor(String wrapperScriptPath,
- String containerIdStr, String user, Path pidFile, File wordDir,
- Map environment)
+ String containerIdStr, String user, Path pidFile, Resource resource,
+ File wordDir, Map environment)
throws IOException {
String[] command = getRunCommand(wrapperScriptPath,
- containerIdStr, user, pidFile, this.getConf());
+ containerIdStr, user, pidFile, this.getConf(), resource);
LOG.info("launchContainer: " + Arrays.toString(command));
return new ShellCommandExecutor(
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/WindowsSecureContainerExecutor.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/WindowsSecureContainerExecutor.java
index cd3e71a8d62..b7bec5f0cff 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/WindowsSecureContainerExecutor.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/WindowsSecureContainerExecutor.java
@@ -52,6 +52,7 @@ import org.apache.hadoop.io.nativeio.NativeIOException;
import org.apache.hadoop.util.NativeCodeLoader;
import org.apache.hadoop.util.Shell;
import org.apache.hadoop.util.Shell.CommandExecutor;
+import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.ContainerLocalizer;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.ResourceLocalizationService;
@@ -727,11 +728,9 @@ public class WindowsSecureContainerExecutor extends DefaultContainerExecutor {
}
@Override
- protected CommandExecutor buildCommandExecutor(String wrapperScriptPath,
- String containerIdStr,
- String userName, Path pidFile,File wordDir, Map environment)
- throws IOException {
-
+ protected CommandExecutor buildCommandExecutor(String wrapperScriptPath,
+ String containerIdStr, String userName, Path pidFile, Resource resource,
+ File wordDir, Map environment) throws IOException {
return new WintuilsProcessStubExecutor(
wordDir.toString(),
containerIdStr, userName, pidFile.toString(),
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestContainerExecutor.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestContainerExecutor.java
index fd3634bd1c9..dc3e9418034 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestContainerExecutor.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestContainerExecutor.java
@@ -18,13 +18,21 @@
package org.apache.hadoop.yarn.server.nodemanager;
+import java.util.Arrays;
+
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.util.Shell;
+import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.server.nodemanager.util.NodeManagerHardwareUtils;
+import org.apache.hadoop.yarn.util.ResourceCalculatorPlugin;
+import org.junit.Assert;
import org.junit.Test;
+
import static org.junit.Assert.*;
+import static org.junit.Assume.assumeTrue;
public class TestContainerExecutor {
@@ -69,4 +77,49 @@ public class TestContainerExecutor {
}
}
+ @Test (timeout = 5000)
+ public void testRunCommandWithNoResources() {
+ // Windows only test
+ assumeTrue(Shell.WINDOWS);
+ Configuration conf = new Configuration();
+ String[] command = containerExecutor.getRunCommand("echo", "group1", null, null,
+ conf, Resource.newInstance(1024, 1));
+ // Assert the cpu and memory limits are set correctly in the command
+ String[] expected = { Shell.WINUTILS, "task", "create", "-m", "-1", "-c",
+ "-1", "group1", "cmd /c " + "echo" };
+ Assert.assertTrue(Arrays.equals(expected, command));
+ }
+
+ @Test (timeout = 5000)
+ public void testRunCommandWithMemoryOnlyResources() {
+ // Windows only test
+ assumeTrue(Shell.WINDOWS);
+ Configuration conf = new Configuration();
+ conf.set(YarnConfiguration.NM_WINDOWS_CONTAINER_MEMORY_LIMIT_ENABLED, "true");
+ String[] command = containerExecutor.getRunCommand("echo", "group1", null, null,
+ conf, Resource.newInstance(1024, 1));
+ // Assert the cpu and memory limits are set correctly in the command
+ String[] expected = { Shell.WINUTILS, "task", "create", "-m", "1024", "-c",
+ "-1", "group1", "cmd /c " + "echo" };
+ Assert.assertTrue(Arrays.equals(expected, command));
+ }
+
+ @Test (timeout = 5000)
+ public void testRunCommandWithCpuAndMemoryResources() {
+ // Windows only test
+ assumeTrue(Shell.WINDOWS);
+ Configuration conf = new Configuration();
+ conf.set(YarnConfiguration.NM_WINDOWS_CONTAINER_CPU_LIMIT_ENABLED, "true");
+ conf.set(YarnConfiguration.NM_WINDOWS_CONTAINER_MEMORY_LIMIT_ENABLED, "true");
+ String[] command = containerExecutor.getRunCommand("echo", "group1", null, null,
+ conf, Resource.newInstance(1024, 1));
+ float yarnProcessors = NodeManagerHardwareUtils.getContainersCores(
+ ResourceCalculatorPlugin.getResourceCalculatorPlugin(null, conf),
+ conf);
+ int cpuRate = Math.min(10000, (int) ((1 * 10000) / yarnProcessors));
+ // Assert the cpu and memory limits are set correctly in the command
+ String[] expected = { Shell.WINUTILS, "task", "create", "-m", "1024", "-c",
+ String.valueOf(cpuRate), "group1", "cmd /c " + "echo" };
+ Assert.assertTrue(Arrays.equals(expected, command));
+ }
}