YARN-443. allow OS scheduling priority of NM to be different than the containers it launches (tgraves)

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1454411 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Thomas Graves 2013-03-08 14:46:26 +00:00
parent 4909821aa9
commit 5f2c518c95
7 changed files with 179 additions and 17 deletions

View File

@ -392,6 +392,10 @@ Release 0.23.7 - UNRELEASED
YARN-227. Application expiration difficult to debug for end-users YARN-227. Application expiration difficult to debug for end-users
(Jason Lowe via jeagles) (Jason Lowe via jeagles)
YARN-443. allow OS scheduling priority of NM to be different than the
containers it launches (tgraves)
OPTIMIZATIONS OPTIMIZATIONS

View File

@ -304,6 +304,17 @@ public class YarnConfiguration extends Configuration {
/** who will execute(launch) the containers.*/ /** who will execute(launch) the containers.*/
public static final String NM_CONTAINER_EXECUTOR = public static final String NM_CONTAINER_EXECUTOR =
NM_PREFIX + "container-executor.class"; NM_PREFIX + "container-executor.class";
/**
* Adjustment to make to the container os scheduling priority.
* The valid values for this could vary depending on the platform.
* On Linux, higher values mean run the containers at a less
* favorable priority than the NM.
* The value specified is an int.
*/
public static final String NM_CONTAINER_EXECUTOR_SCHED_PRIORITY =
NM_PREFIX + "container-executor.os.sched.priority.adjustment";
public static final int DEFAULT_NM_CONTAINER_EXECUTOR_SCHED_PRIORITY = 0;
/** Number of threads container manager uses.*/ /** Number of threads container manager uses.*/
public static final String NM_CONTAINER_MGR_THREAD_COUNT = public static final String NM_CONTAINER_MGR_THREAD_COUNT =

View File

@ -20,6 +20,8 @@ package org.apache.hadoop.yarn.server.nodemanager;
import java.io.IOException; import java.io.IOException;
import java.net.InetSocketAddress; import java.net.InetSocketAddress;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List; import java.util.List;
import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap; import java.util.concurrent.ConcurrentMap;
@ -35,6 +37,7 @@ import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.permission.FsPermission; import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.util.Shell.ShellCommandExecutor; import org.apache.hadoop.util.Shell.ShellCommandExecutor;
import org.apache.hadoop.yarn.api.records.ContainerId; import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container; import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container;
import org.apache.hadoop.yarn.server.nodemanager.util.ProcessIdFileReader; import org.apache.hadoop.yarn.server.nodemanager.util.ProcessIdFileReader;
import org.apache.hadoop.util.Shell; import org.apache.hadoop.util.Shell;
@ -184,18 +187,39 @@ public abstract class ContainerExecutor implements Configurable {
} }
} }
/** Return a command to execute the given command in OS shell. /**
* Return a command to execute the given command in OS shell.
* On Windows, the passed in groupId can be used to launch * On Windows, the passed in groupId can be used to launch
* and associate the given groupId in a process group. On * and associate the given groupId in a process group. On
* non-Windows, groupId is ignored. */ * non-Windows, groupId is ignored.
protected static String[] getRunCommand(String command, */
String groupId) { protected static String[] getRunCommand(String command, String groupId,
Configuration conf) {
boolean containerSchedPriorityIsSet = false;
int containerSchedPriorityAdjustment =
YarnConfiguration.DEFAULT_NM_CONTAINER_EXECUTOR_SCHED_PRIORITY;
if (conf.get(YarnConfiguration.NM_CONTAINER_EXECUTOR_SCHED_PRIORITY) !=
null) {
containerSchedPriorityIsSet = true;
containerSchedPriorityAdjustment = conf
.getInt(YarnConfiguration.NM_CONTAINER_EXECUTOR_SCHED_PRIORITY,
YarnConfiguration.DEFAULT_NM_CONTAINER_EXECUTOR_SCHED_PRIORITY);
}
if (Shell.WINDOWS) { if (Shell.WINDOWS) {
return new String[] { Shell.WINUTILS, "task", "create", groupId, return new String[] { Shell.WINUTILS, "task", "create", groupId,
"cmd /c " + command }; "cmd /c " + command };
} else { } else {
return new String[] { "bash", "-c", command }; List<String> retCommand = new ArrayList<String>();
if (containerSchedPriorityIsSet) {
retCommand.addAll(Arrays.asList("nice", "-n",
Integer.toString(containerSchedPriorityAdjustment)));
}
retCommand.addAll(Arrays.asList("bash", "-c", command));
return retCommand.toArray(new String[retCommand.size()]);
} }
} }
/** Return a command for determining if process with specified pid is alive. */ /** Return a command for determining if process with specified pid is alive. */

View File

@ -181,7 +181,7 @@ public class DefaultContainerExecutor extends ContainerExecutor {
// Setup command to run // Setup command to run
String[] command = getRunCommand(sb.getWrapperScriptPath().toString(), String[] command = getRunCommand(sb.getWrapperScriptPath().toString(),
containerIdStr); containerIdStr, this.getConf());
LOG.info("launchContainer: " + Arrays.toString(command)); LOG.info("launchContainer: " + Arrays.toString(command));
shExec = new ShellCommandExecutor( shExec = new ShellCommandExecutor(

View File

@ -50,6 +50,8 @@ public class LinuxContainerExecutor extends ContainerExecutor {
private String containerExecutorExe; private String containerExecutorExe;
private LCEResourcesHandler resourcesHandler; private LCEResourcesHandler resourcesHandler;
private boolean containerSchedPriorityIsSet = false;
private int containerSchedPriorityAdjustment = 0;
@Override @Override
@ -61,6 +63,13 @@ public class LinuxContainerExecutor extends ContainerExecutor {
conf.getClass(YarnConfiguration.NM_LINUX_CONTAINER_RESOURCES_HANDLER, conf.getClass(YarnConfiguration.NM_LINUX_CONTAINER_RESOURCES_HANDLER,
DefaultLCEResourcesHandler.class, LCEResourcesHandler.class), conf); DefaultLCEResourcesHandler.class, LCEResourcesHandler.class), conf);
resourcesHandler.setConf(conf); resourcesHandler.setConf(conf);
if (conf.get(YarnConfiguration.NM_CONTAINER_EXECUTOR_SCHED_PRIORITY) != null) {
containerSchedPriorityIsSet = true;
containerSchedPriorityAdjustment = conf
.getInt(YarnConfiguration.NM_CONTAINER_EXECUTOR_SCHED_PRIORITY,
YarnConfiguration.DEFAULT_NM_CONTAINER_EXECUTOR_SCHED_PRIORITY);
}
} }
/** /**
@ -114,6 +123,13 @@ public class LinuxContainerExecutor extends ContainerExecutor {
: conf.get(YarnConfiguration.NM_LINUX_CONTAINER_EXECUTOR_PATH, defaultPath); : conf.get(YarnConfiguration.NM_LINUX_CONTAINER_EXECUTOR_PATH, defaultPath);
} }
protected void addSchedPriorityCommand(List<String> command) {
if (containerSchedPriorityIsSet) {
command.addAll(Arrays.asList("nice", "-n",
Integer.toString(containerSchedPriorityAdjustment)));
}
}
@Override @Override
public void init() throws IOException { public void init() throws IOException {
// Send command to executor which will just start up, // Send command to executor which will just start up,
@ -145,14 +161,15 @@ public class LinuxContainerExecutor extends ContainerExecutor {
List<String> localDirs, List<String> logDirs) List<String> localDirs, List<String> logDirs)
throws IOException, InterruptedException { throws IOException, InterruptedException {
List<String> command = new ArrayList<String>( List<String> command = new ArrayList<String>();
Arrays.asList(containerExecutorExe, addSchedPriorityCommand(command);
user, command.addAll(Arrays.asList(containerExecutorExe,
Integer.toString(Commands.INITIALIZE_CONTAINER.getValue()), user,
appId, Integer.toString(Commands.INITIALIZE_CONTAINER.getValue()),
nmPrivateContainerTokensPath.toUri().getPath().toString(), appId,
StringUtils.join(",", localDirs), nmPrivateContainerTokensPath.toUri().getPath().toString(),
StringUtils.join(",", logDirs))); StringUtils.join(",", localDirs),
StringUtils.join(",", logDirs)));
File jvm = // use same jvm as parent File jvm = // use same jvm as parent
new File(new File(System.getProperty("java.home"), "bin"), "java"); new File(new File(System.getProperty("java.home"), "bin"), "java");
@ -212,7 +229,9 @@ public class LinuxContainerExecutor extends ContainerExecutor {
try { try {
Path pidFilePath = getPidFilePath(containerId); Path pidFilePath = getPidFilePath(containerId);
if (pidFilePath != null) { if (pidFilePath != null) {
List<String> command = new ArrayList<String>(Arrays.asList( List<String> command = new ArrayList<String>();
addSchedPriorityCommand(command);
command.addAll(Arrays.asList(
containerExecutorExe, user, Integer containerExecutorExe, user, Integer
.toString(Commands.LAUNCH_CONTAINER.getValue()), appId, .toString(Commands.LAUNCH_CONTAINER.getValue()), appId,
containerIdStr, containerWorkDir.toString(), containerIdStr, containerWorkDir.toString(),

View File

@ -0,0 +1,70 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.server.nodemanager;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.util.Shell;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.junit.Test;
import static org.junit.Assert.*;
public class TestContainerExecutor {
@Test (timeout = 5000)
public void testRunCommandNoPriority() throws Exception {
Configuration conf = new Configuration();
String[] command = ContainerExecutor.getRunCommand("echo", "group1", conf);
assertTrue("first command should be the run command for the platform",
command[0].equals(Shell.WINUTILS) || command[0].equals("bash"));
}
@Test (timeout = 5000)
public void testRunCommandwithPriority() throws Exception {
Configuration conf = new Configuration();
conf.setInt(YarnConfiguration.NM_CONTAINER_EXECUTOR_SCHED_PRIORITY, 2);
String[] command = ContainerExecutor.getRunCommand("echo", "group1", conf);
if (Shell.WINDOWS) {
// windows doesn't currently support
assertEquals("first command should be the run command for the platform",
Shell.WINUTILS, command[0]);
} else {
assertEquals("first command should be nice", "nice", command[0]);
assertEquals("second command should be -n", "-n", command[1]);
assertEquals("third command should be the priority", Integer.toString(2),
command[2]);
}
// test with negative number
conf.setInt(YarnConfiguration.NM_CONTAINER_EXECUTOR_SCHED_PRIORITY, -5);
command = ContainerExecutor.getRunCommand("echo", "group1", conf);
if (Shell.WINDOWS) {
// windows doesn't currently support
assertEquals("first command should be the run command for the platform",
Shell.WINUTILS, command[0]);
} else {
assertEquals("first command should be nice", "nice", command[0]);
assertEquals("second command should be -n", "-n", command[1]);
assertEquals("third command should be the priority", Integer.toString(-5),
command[2]);
}
}
}

View File

@ -27,6 +27,7 @@ import java.io.FileReader;
import java.io.IOException; import java.io.IOException;
import java.io.LineNumberReader; import java.io.LineNumberReader;
import java.net.InetSocketAddress; import java.net.InetSocketAddress;
import java.util.ArrayList;
import java.util.Arrays; import java.util.Arrays;
import java.util.HashMap; import java.util.HashMap;
import java.util.LinkedList; import java.util.LinkedList;
@ -131,8 +132,41 @@ public class TestLinuxContainerExecutorWithMocks {
} }
@Test (timeout = 5000)
public void testContainerLaunchWithPriority() throws IOException {
// set the scheduler priority to make sure still works with nice -n prio
File f = new File("./src/test/resources/mock-container-executor");
if (!f.canExecute()) {
f.setExecutable(true);
}
String executorPath = f.getAbsolutePath();
Configuration conf = new Configuration();
conf.set(YarnConfiguration.NM_LINUX_CONTAINER_EXECUTOR_PATH, executorPath);
conf.setInt(YarnConfiguration.NM_CONTAINER_EXECUTOR_SCHED_PRIORITY, 2);
mockExec.setConf(conf);
List<String> command = new ArrayList<String>();
mockExec.addSchedPriorityCommand(command);
assertEquals("first should be nice", "nice", command.get(0));
assertEquals("second should be -n", "-n", command.get(1));
assertEquals("third should be the priority", Integer.toString(2),
command.get(2));
testContainerLaunch();
}
@Test (timeout = 5000)
public void testLaunchCommandWithoutPriority() throws IOException {
// make sure the command doesn't contain the nice -n since priority
// not specified
List<String> command = new ArrayList<String>();
mockExec.addSchedPriorityCommand(command);
assertEquals("addSchedPriority should be empty", 0, command.size());
}
@Test @Test (timeout = 5000)
public void testStartLocalizer() throws IOException { public void testStartLocalizer() throws IOException {