YARN-2502. Changed DistributedShell to support node labels. Contributed by Wangda Tan

This commit is contained in:
Jian He 2014-10-27 20:13:00 -07:00
parent b0e19c9d54
commit f6b963fdfc
3 changed files with 163 additions and 19 deletions

View File

@ -404,6 +404,9 @@ Release 2.6.0 - UNRELEASED
sake of localization and log-aggregation for long-running services. (Jian He sake of localization and log-aggregation for long-running services. (Jian He
via vinodkv) via vinodkv)
YARN-2502. Changed DistributedShell to support node labels. (Wangda Tan via
jianhe)
OPTIMIZATIONS OPTIMIZATIONS
BUG FIXES BUG FIXES

View File

@ -115,7 +115,7 @@ import org.apache.hadoop.yarn.util.timeline.TimelineUtils;
public class Client { public class Client {
private static final Log LOG = LogFactory.getLog(Client.class); private static final Log LOG = LogFactory.getLog(Client.class);
// Configuration // Configuration
private Configuration conf; private Configuration conf;
private YarnClient yarnClient; private YarnClient yarnClient;
@ -152,6 +152,7 @@ public class Client {
private int containerVirtualCores = 1; private int containerVirtualCores = 1;
// No. of containers in which the shell script needs to be executed // No. of containers in which the shell script needs to be executed
private int numContainers = 1; private int numContainers = 1;
private String nodeLabelExpression = null;
// log4j.properties file // log4j.properties file
// if available, add to local resources and set into classpath // if available, add to local resources and set into classpath
@ -280,7 +281,12 @@ public class Client {
opts.addOption("create", false, "Flag to indicate whether to create the " opts.addOption("create", false, "Flag to indicate whether to create the "
+ "domain specified with -domain."); + "domain specified with -domain.");
opts.addOption("help", false, "Print usage"); opts.addOption("help", false, "Print usage");
opts.addOption("node_label_expression", true,
"Node label expression to determine the nodes"
+ " where all the containers of this application"
+ " will be allocated, \"\" means containers"
+ " can be allocated anywhere, if you don't specify the option,"
+ " default node_label_expression of queue will be used.");
} }
/** /**
@ -391,6 +397,7 @@ public class Client {
containerMemory = Integer.parseInt(cliParser.getOptionValue("container_memory", "10")); containerMemory = Integer.parseInt(cliParser.getOptionValue("container_memory", "10"));
containerVirtualCores = Integer.parseInt(cliParser.getOptionValue("container_vcores", "1")); containerVirtualCores = Integer.parseInt(cliParser.getOptionValue("container_vcores", "1"));
numContainers = Integer.parseInt(cliParser.getOptionValue("num_containers", "1")); numContainers = Integer.parseInt(cliParser.getOptionValue("num_containers", "1"));
if (containerMemory < 0 || containerVirtualCores < 0 || numContainers < 1) { if (containerMemory < 0 || containerVirtualCores < 0 || numContainers < 1) {
throw new IllegalArgumentException("Invalid no. of containers or container memory/vcores specified," throw new IllegalArgumentException("Invalid no. of containers or container memory/vcores specified,"
@ -399,6 +406,8 @@ public class Client {
+ ", containerVirtualCores=" + containerVirtualCores + ", containerVirtualCores=" + containerVirtualCores
+ ", numContainer=" + numContainers); + ", numContainer=" + numContainers);
} }
nodeLabelExpression = cliParser.getOptionValue("node_label_expression", null);
clientTimeout = Integer.parseInt(cliParser.getOptionValue("timeout", "600000")); clientTimeout = Integer.parseInt(cliParser.getOptionValue("timeout", "600000"));
@ -617,6 +626,9 @@ public class Client {
vargs.add("--container_memory " + String.valueOf(containerMemory)); vargs.add("--container_memory " + String.valueOf(containerMemory));
vargs.add("--container_vcores " + String.valueOf(containerVirtualCores)); vargs.add("--container_vcores " + String.valueOf(containerVirtualCores));
vargs.add("--num_containers " + String.valueOf(numContainers)); vargs.add("--num_containers " + String.valueOf(numContainers));
if (null != nodeLabelExpression) {
appContext.setNodeLabelExpression(nodeLabelExpression);
}
vargs.add("--priority " + String.valueOf(shellCmdPriority)); vargs.add("--priority " + String.valueOf(shellCmdPriority));
for (Map.Entry<String, String> entry : shellEnv.entrySet()) { for (Map.Entry<String, String> entry : shellEnv.entrySet()) {

View File

@ -30,7 +30,9 @@ import java.net.InetAddress;
import java.net.URL; import java.net.URL;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Arrays; import java.util.Arrays;
import java.util.HashSet;
import java.util.List; import java.util.List;
import java.util.Set;
import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.commons.logging.Log; import org.apache.commons.logging.Log;
@ -42,6 +44,7 @@ import org.apache.hadoop.net.NetUtils;
import org.apache.hadoop.util.JarFinder; import org.apache.hadoop.util.JarFinder;
import org.apache.hadoop.util.Shell; import org.apache.hadoop.util.Shell;
import org.apache.hadoop.yarn.api.records.ApplicationReport; import org.apache.hadoop.yarn.api.records.ApplicationReport;
import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.hadoop.yarn.api.records.YarnApplicationState; import org.apache.hadoop.yarn.api.records.YarnApplicationState;
import org.apache.hadoop.yarn.api.records.timeline.TimelineDomain; import org.apache.hadoop.yarn.api.records.timeline.TimelineDomain;
import org.apache.hadoop.yarn.api.records.timeline.TimelineEntities; import org.apache.hadoop.yarn.api.records.timeline.TimelineEntities;
@ -49,40 +52,81 @@ import org.apache.hadoop.yarn.client.api.YarnClient;
import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.server.MiniYARNCluster; import org.apache.hadoop.yarn.server.MiniYARNCluster;
import org.apache.hadoop.yarn.server.nodemanager.NodeManager; import org.apache.hadoop.yarn.server.nodemanager.NodeManager;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.ContainerManagerImpl; import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler; import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fifo.FifoScheduler; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler;
import org.junit.After; import org.junit.After;
import org.junit.Assert; import org.junit.Assert;
import org.junit.Before; import org.junit.Before;
import org.junit.Test; import org.junit.Test;
import com.google.common.collect.ImmutableMap;
public class TestDistributedShell { public class TestDistributedShell {
private static final Log LOG = private static final Log LOG =
LogFactory.getLog(TestDistributedShell.class); LogFactory.getLog(TestDistributedShell.class);
protected MiniYARNCluster yarnCluster = null; protected MiniYARNCluster yarnCluster = null;
protected Configuration conf = new YarnConfiguration(); private int numNodeManager = 1;
private YarnConfiguration conf = null;
protected final static String APPMASTER_JAR = protected final static String APPMASTER_JAR =
JarFinder.getJar(ApplicationMaster.class); JarFinder.getJar(ApplicationMaster.class);
private void initializeNodeLabels() throws IOException {
RMContext rmContext = yarnCluster.getResourceManager(0).getRMContext();
// Setup node labels
RMNodeLabelsManager labelsMgr = rmContext.getNodeLabelManager();
Set<String> labels = new HashSet<String>();
labels.add("x");
labelsMgr.addToCluserNodeLabels(labels);
// Setup queue access to node labels
conf.set("yarn.scheduler.capacity.root.accessible-node-labels", "x");
conf.set("yarn.scheduler.capacity.root.default.accessible-node-labels", "x");
conf.set(
"yarn.scheduler.capacity.root.default.accessible-node-labels.x.capacity",
"100");
rmContext.getScheduler().reinitialize(conf, rmContext);
// Fetch node-ids from yarn cluster
NodeId[] nodeIds = new NodeId[numNodeManager];
for (int i = 0; i < numNodeManager; i++) {
NodeManager mgr = this.yarnCluster.getNodeManager(i);
nodeIds[i] = mgr.getNMContext().getNodeId();
}
// Set label x to NM[1]
labelsMgr.addLabelsToNode(ImmutableMap.of(nodeIds[1], labels));
}
@Before @Before
public void setup() throws Exception { public void setup() throws Exception {
LOG.info("Starting up YARN cluster"); LOG.info("Starting up YARN cluster");
conf = new YarnConfiguration();
conf.setInt(YarnConfiguration.RM_SCHEDULER_MINIMUM_ALLOCATION_MB, 128); conf.setInt(YarnConfiguration.RM_SCHEDULER_MINIMUM_ALLOCATION_MB, 128);
conf.setClass(YarnConfiguration.RM_SCHEDULER,
FifoScheduler.class, ResourceScheduler.class);
conf.set("yarn.log.dir", "target"); conf.set("yarn.log.dir", "target");
conf.setBoolean(YarnConfiguration.TIMELINE_SERVICE_ENABLED, true); conf.setBoolean(YarnConfiguration.TIMELINE_SERVICE_ENABLED, true);
conf.set(YarnConfiguration.RM_SCHEDULER, CapacityScheduler.class.getName());
numNodeManager = 2;
if (yarnCluster == null) { if (yarnCluster == null) {
yarnCluster = new MiniYARNCluster( yarnCluster =
TestDistributedShell.class.getSimpleName(), 1, 1, 1, 1, true); new MiniYARNCluster(TestDistributedShell.class.getSimpleName(), 1,
numNodeManager, 1, 1, true);
yarnCluster.init(conf); yarnCluster.init(conf);
yarnCluster.start(); yarnCluster.start();
NodeManager nm = yarnCluster.getNodeManager(0);
waitForNMToRegister(nm); waitForNMsToRegister();
// currently only capacity scheduler support node labels,
initializeNodeLabels();
URL url = Thread.currentThread().getContextClassLoader().getResource("yarn-site.xml"); URL url = Thread.currentThread().getContextClassLoader().getResource("yarn-site.xml");
if (url == null) { if (url == null) {
@ -757,13 +801,15 @@ public class TestDistributedShell {
} }
} }
protected static void waitForNMToRegister(NodeManager nm) protected void waitForNMsToRegister() throws Exception {
throws Exception { int sec = 60;
int attempt = 60; while (sec >= 0) {
ContainerManagerImpl cm = if (yarnCluster.getResourceManager().getRMContext().getRMNodes().size()
((ContainerManagerImpl) nm.getNMContext().getContainerManager()); >= numNodeManager) {
while (cm.getBlockNewContainerRequestsStatus() && attempt-- > 0) { break;
Thread.sleep(2000); }
Thread.sleep(1000);
sec--;
} }
} }
@ -892,5 +938,88 @@ public class TestDistributedShell {
} }
return numOfWords; return numOfWords;
} }
@Test(timeout=90000)
public void testDSShellWithNodeLabelExpression() throws Exception {
// Start NMContainerMonitor
NMContainerMonitor mon = new NMContainerMonitor();
Thread t = new Thread(mon);
t.start();
// Submit a job which will sleep for 60 sec
String[] args = {
"--jar",
APPMASTER_JAR,
"--num_containers",
"4",
"--shell_command",
"sleep",
"--shell_args",
"15",
"--master_memory",
"512",
"--master_vcores",
"2",
"--container_memory",
"128",
"--container_vcores",
"1",
"--node_label_expression",
"x"
};
LOG.info("Initializing DS Client");
final Client client =
new Client(new Configuration(yarnCluster.getConfig()));
boolean initSuccess = client.init(args);
Assert.assertTrue(initSuccess);
LOG.info("Running DS Client");
boolean result = client.run();
LOG.info("Client run completed. Result=" + result);
t.interrupt();
// Check maximum number of containers on each NMs
int[] maxRunningContainersOnNMs = mon.getMaxRunningContainersReport();
// Check no container allocated on NM[0]
Assert.assertEquals(0, maxRunningContainersOnNMs[0]);
// Check there're some containers allocated on NM[1]
Assert.assertTrue(maxRunningContainersOnNMs[1] > 0);
}
/**
* Monitor containers running on NMs
*/
private class NMContainerMonitor implements Runnable {
// The interval of milliseconds of sampling (500ms)
final static int SAMPLING_INTERVAL_MS = 500;
// The maximum number of containers running on each NMs
int[] maxRunningContainersOnNMs = new int[numNodeManager];
@Override
public void run() {
while (true) {
for (int i = 0; i < numNodeManager; i++) {
int nContainers =
yarnCluster.getNodeManager(i).getNMContext().getContainers()
.size();
if (nContainers > maxRunningContainersOnNMs[i]) {
maxRunningContainersOnNMs[i] = nContainers;
}
}
try {
Thread.sleep(SAMPLING_INTERVAL_MS);
} catch (InterruptedException e) {
e.printStackTrace();
break;
}
}
}
public int[] getMaxRunningContainersReport() {
return maxRunningContainersOnNMs;
}
}
} }