YARN-8175. Add support for Node Labels in SLS. Contributed by Abhishek Modi.

This commit is contained in:
Inigo Goiri 2018-07-31 09:36:34 -07:00
parent b28bdc7e8b
commit 9fea5c9ee7
12 changed files with 301 additions and 138 deletions

View File

@ -60,6 +60,7 @@ import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner; import org.apache.hadoop.util.ToolRunner;
import org.apache.hadoop.yarn.api.records.ExecutionType; import org.apache.hadoop.yarn.api.records.ExecutionType;
import org.apache.hadoop.yarn.api.records.NodeId; import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.hadoop.yarn.api.records.NodeLabel;
import org.apache.hadoop.yarn.api.records.NodeState; import org.apache.hadoop.yarn.api.records.NodeState;
import org.apache.hadoop.yarn.api.records.ReservationId; import org.apache.hadoop.yarn.api.records.ReservationId;
import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.api.records.Resource;
@ -298,30 +299,20 @@ public class SLSRunner extends Configured implements Tool {
SLSConfiguration.NM_RESOURCE_UTILIZATION_RATIO, SLSConfiguration.NM_RESOURCE_UTILIZATION_RATIO,
SLSConfiguration.NM_RESOURCE_UTILIZATION_RATIO_DEFAULT); SLSConfiguration.NM_RESOURCE_UTILIZATION_RATIO_DEFAULT);
// nm information (fetch from topology file, or from sls/rumen json file) // nm information (fetch from topology file, or from sls/rumen json file)
Map<String, Resource> nodeResourceMap = new HashMap<>(); Set<NodeDetails> nodeSet = null;
Set<? extends String> nodeSet;
if (nodeFile.isEmpty()) { if (nodeFile.isEmpty()) {
for (String inputTrace : inputTraces) { for (String inputTrace : inputTraces) {
switch (inputType) { switch (inputType) {
case SLS: case SLS:
nodeSet = SLSUtils.parseNodesFromSLSTrace(inputTrace); nodeSet = SLSUtils.parseNodesFromSLSTrace(inputTrace);
for (String node : nodeSet) {
nodeResourceMap.put(node, null);
}
break; break;
case RUMEN: case RUMEN:
nodeSet = SLSUtils.parseNodesFromRumenTrace(inputTrace); nodeSet = SLSUtils.parseNodesFromRumenTrace(inputTrace);
for (String node : nodeSet) {
nodeResourceMap.put(node, null);
}
break; break;
case SYNTH: case SYNTH:
stjp = new SynthTraceJobProducer(getConf(), new Path(inputTraces[0])); stjp = new SynthTraceJobProducer(getConf(), new Path(inputTraces[0]));
nodeSet = SLSUtils.generateNodes(stjp.getNumNodes(), nodeSet = SLSUtils.generateNodes(stjp.getNumNodes(),
stjp.getNumNodes()/stjp.getNodesPerRack()); stjp.getNumNodes()/stjp.getNodesPerRack());
for (String node : nodeSet) {
nodeResourceMap.put(node, null);
}
break; break;
default: default:
throw new YarnException("Input configuration not recognized, " throw new YarnException("Input configuration not recognized, "
@ -329,11 +320,11 @@ public class SLSRunner extends Configured implements Tool {
} }
} }
} else { } else {
nodeResourceMap = SLSUtils.parseNodesFromNodeFile(nodeFile, nodeSet = SLSUtils.parseNodesFromNodeFile(nodeFile,
nodeManagerResource); nodeManagerResource);
} }
if (nodeResourceMap.size() == 0) { if (nodeSet == null || nodeSet.isEmpty()) {
throw new YarnException("No node! Please configure nodes."); throw new YarnException("No node! Please configure nodes.");
} }
@ -344,20 +335,21 @@ public class SLSRunner extends Configured implements Tool {
SLSConfiguration.RUNNER_POOL_SIZE_DEFAULT); SLSConfiguration.RUNNER_POOL_SIZE_DEFAULT);
ExecutorService executorService = Executors. ExecutorService executorService = Executors.
newFixedThreadPool(threadPoolSize); newFixedThreadPool(threadPoolSize);
for (Map.Entry<String, Resource> entry : nodeResourceMap.entrySet()) { for (NodeDetails nodeDetails : nodeSet) {
executorService.submit(new Runnable() { executorService.submit(new Runnable() {
@Override public void run() { @Override public void run() {
try { try {
// we randomize the heartbeat start time from zero to 1 interval // we randomize the heartbeat start time from zero to 1 interval
NMSimulator nm = new NMSimulator(); NMSimulator nm = new NMSimulator();
Resource nmResource = nodeManagerResource; Resource nmResource = nodeManagerResource;
String hostName = entry.getKey(); String hostName = nodeDetails.getHostname();
if (entry.getValue() != null) { if (nodeDetails.getNodeResource() != null) {
nmResource = entry.getValue(); nmResource = nodeDetails.getNodeResource();
} }
Set<NodeLabel> nodeLabels = nodeDetails.getLabels();
nm.init(hostName, nmResource, nm.init(hostName, nmResource,
random.nextInt(heartbeatInterval), random.nextInt(heartbeatInterval),
heartbeatInterval, rm, resourceUtilizationRatio); heartbeatInterval, rm, resourceUtilizationRatio, nodeLabels);
nmMap.put(nm.getNode().getNodeID(), nm); nmMap.put(nm.getNode().getNodeID(), nm);
runner.schedule(nm); runner.schedule(nm);
rackSet.add(nm.getNode().getRackName()); rackSet.add(nm.getNode().getRackName());
@ -452,6 +444,11 @@ public class SLSRunner extends Configured implements Tool {
jsonJob.get(SLSConfiguration.JOB_END_MS).toString()); jsonJob.get(SLSConfiguration.JOB_END_MS).toString());
} }
String jobLabelExpr = null;
if (jsonJob.containsKey(SLSConfiguration.JOB_LABEL_EXPR)) {
jobLabelExpr = jsonJob.get(SLSConfiguration.JOB_LABEL_EXPR).toString();
}
String user = (String) jsonJob.get(SLSConfiguration.JOB_USER); String user = (String) jsonJob.get(SLSConfiguration.JOB_USER);
if (user == null) { if (user == null) {
user = "default"; user = "default";
@ -481,7 +478,8 @@ public class SLSRunner extends Configured implements Tool {
for (int i = 0; i < jobCount; i++) { for (int i = 0; i < jobCount; i++) {
runNewAM(amType, user, queue, oldAppId, jobStartTime, jobFinishTime, runNewAM(amType, user, queue, oldAppId, jobStartTime, jobFinishTime,
getTaskContainers(jsonJob), getAMContainerResource(jsonJob)); getTaskContainers(jsonJob), getAMContainerResource(jsonJob),
jobLabelExpr);
} }
} }
@ -730,7 +728,7 @@ public class SLSRunner extends Configured implements Tool {
runNewAM(job.getType(), user, jobQueue, oldJobId, runNewAM(job.getType(), user, jobQueue, oldJobId,
jobStartTimeMS, jobFinishTimeMS, containerList, reservationId, jobStartTimeMS, jobFinishTimeMS, containerList, reservationId,
job.getDeadline(), getAMContainerResource(null), job.getDeadline(), getAMContainerResource(null), null,
job.getParams()); job.getParams());
} }
} }
@ -775,15 +773,24 @@ public class SLSRunner extends Configured implements Tool {
Resource amContainerResource) { Resource amContainerResource) {
runNewAM(jobType, user, jobQueue, oldJobId, jobStartTimeMS, runNewAM(jobType, user, jobQueue, oldJobId, jobStartTimeMS,
jobFinishTimeMS, containerList, null, -1, jobFinishTimeMS, containerList, null, -1,
amContainerResource, null); amContainerResource, null, null);
} }
private void runNewAM(String jobType, String user, private void runNewAM(String jobType, String user,
String jobQueue, String oldJobId, long jobStartTimeMS, String jobQueue, String oldJobId, long jobStartTimeMS,
long jobFinishTimeMS, List<ContainerSimulator> containerList, long jobFinishTimeMS, List<ContainerSimulator> containerList,
ReservationId reservationId, long deadline, Resource amContainerResource, Resource amContainerResource, String labelExpr) {
Map<String, String> params) { runNewAM(jobType, user, jobQueue, oldJobId, jobStartTimeMS,
jobFinishTimeMS, containerList, null, -1,
amContainerResource, labelExpr, null);
}
@SuppressWarnings("checkstyle:parameternumber")
private void runNewAM(String jobType, String user,
String jobQueue, String oldJobId, long jobStartTimeMS,
long jobFinishTimeMS, List<ContainerSimulator> containerList,
ReservationId reservationId, long deadline, Resource amContainerResource,
String labelExpr, Map<String, String> params) {
AMSimulator amSim = (AMSimulator) ReflectionUtils.newInstance( AMSimulator amSim = (AMSimulator) ReflectionUtils.newInstance(
amClassMap.get(jobType), new Configuration()); amClassMap.get(jobType), new Configuration());
@ -799,7 +806,7 @@ public class SLSRunner extends Configured implements Tool {
AM_ID++; AM_ID++;
amSim.init(heartbeatInterval, containerList, rm, this, jobStartTimeMS, amSim.init(heartbeatInterval, containerList, rm, this, jobStartTimeMS,
jobFinishTimeMS, user, jobQueue, isTracked, oldJobId, jobFinishTimeMS, user, jobQueue, isTracked, oldJobId,
runner.getStartTimeMS(), amContainerResource, params); runner.getStartTimeMS(), amContainerResource, labelExpr, params);
if(reservationId != null) { if(reservationId != null) {
// if we have a ReservationId, delegate reservation creation to // if we have a ReservationId, delegate reservation creation to
// AMSim (reservation shape is impl specific) // AMSim (reservation shape is impl specific)
@ -985,4 +992,42 @@ public class SLSRunner extends Configured implements Tool {
System.err.println(); System.err.println();
} }
/**
* Class to encapsulate all details about the node.
*/
@Private
@Unstable
public static class NodeDetails {
private String hostname;
private Resource nodeResource;
private Set<NodeLabel> labels;
public NodeDetails(String nodeHostname) {
this.hostname = nodeHostname;
}
public String getHostname() {
return hostname;
}
public void setHostname(String hostname) {
this.hostname = hostname;
}
public Resource getNodeResource() {
return nodeResource;
}
public void setNodeResource(Resource nodeResource) {
this.nodeResource = nodeResource;
}
public Set<NodeLabel> getLabels() {
return labels;
}
public void setLabels(Set<NodeLabel> labels) {
this.labels = labels;
}
}
} }

View File

@ -88,6 +88,8 @@ public abstract class AMSimulator extends TaskRunner.Task {
private int responseId = 0; private int responseId = 0;
// user name // user name
private String user; private String user;
// nodelabel expression
private String nodeLabelExpression;
// queue name // queue name
protected String queue; protected String queue;
// am type // am type
@ -123,7 +125,8 @@ public abstract class AMSimulator extends TaskRunner.Task {
List<ContainerSimulator> containerList, ResourceManager resourceManager, List<ContainerSimulator> containerList, ResourceManager resourceManager,
SLSRunner slsRunnner, long startTime, long finishTime, String simUser, SLSRunner slsRunnner, long startTime, long finishTime, String simUser,
String simQueue, boolean tracked, String oldApp, long baseTimeMS, String simQueue, boolean tracked, String oldApp, long baseTimeMS,
Resource amResource, Map<String, String> params) { Resource amResource, String nodeLabelExpr,
Map<String, String> params) {
super.init(startTime, startTime + 1000000L * heartbeatInterval, super.init(startTime, startTime + 1000000L * heartbeatInterval,
heartbeatInterval); heartbeatInterval);
this.user = simUser; this.user = simUser;
@ -136,6 +139,7 @@ public abstract class AMSimulator extends TaskRunner.Task {
this.traceStartTimeMS = startTime; this.traceStartTimeMS = startTime;
this.traceFinishTimeMS = finishTime; this.traceFinishTimeMS = finishTime;
this.amContainerResource = amResource; this.amContainerResource = amResource;
this.nodeLabelExpression = nodeLabelExpr;
} }
/** /**
@ -327,6 +331,9 @@ public abstract class AMSimulator extends TaskRunner.Task {
conLauContext.setServiceData(new HashMap<>()); conLauContext.setServiceData(new HashMap<>());
appSubContext.setAMContainerSpec(conLauContext); appSubContext.setAMContainerSpec(conLauContext);
appSubContext.setResource(amContainerResource); appSubContext.setResource(amContainerResource);
if (nodeLabelExpression != null) {
appSubContext.setNodeLabelExpression(nodeLabelExpression);
}
if(reservationId != null) { if(reservationId != null) {
appSubContext.setReservationID(reservationId); appSubContext.setReservationID(reservationId);

View File

@ -126,10 +126,11 @@ public class MRAMSimulator extends AMSimulator {
List<ContainerSimulator> containerList, ResourceManager rm, SLSRunner se, List<ContainerSimulator> containerList, ResourceManager rm, SLSRunner se,
long traceStartTime, long traceFinishTime, String user, String queue, long traceStartTime, long traceFinishTime, String user, String queue,
boolean isTracked, String oldAppId, long baselineStartTimeMS, boolean isTracked, String oldAppId, long baselineStartTimeMS,
Resource amContainerResource, Map<String, String> params) { Resource amContainerResource, String nodeLabelExpr,
Map<String, String> params) {
super.init(heartbeatInterval, containerList, rm, se, super.init(heartbeatInterval, containerList, rm, se,
traceStartTime, traceFinishTime, user, queue, isTracked, oldAppId, traceStartTime, traceFinishTime, user, queue, isTracked, oldAppId,
baselineStartTimeMS, amContainerResource, params); baselineStartTimeMS, amContainerResource, nodeLabelExpr, params);
amtype = "mapreduce"; amtype = "mapreduce";
// get map/reduce tasks // get map/reduce tasks

View File

@ -96,10 +96,11 @@ public class StreamAMSimulator extends AMSimulator {
List<ContainerSimulator> containerList, ResourceManager rm, SLSRunner se, List<ContainerSimulator> containerList, ResourceManager rm, SLSRunner se,
long traceStartTime, long traceFinishTime, String user, String queue, long traceStartTime, long traceFinishTime, String user, String queue,
boolean isTracked, String oldAppId, long baselineStartTimeMS, boolean isTracked, String oldAppId, long baselineStartTimeMS,
Resource amContainerResource, Map<String, String> params) { Resource amContainerResource, String nodeLabelExpr,
Map<String, String> params) {
super.init(heartbeatInterval, containerList, rm, se, traceStartTime, super.init(heartbeatInterval, containerList, rm, se, traceStartTime,
traceFinishTime, user, queue, isTracked, oldAppId, baselineStartTimeMS, traceFinishTime, user, queue, isTracked, oldAppId, baselineStartTimeMS,
amContainerResource, params); amContainerResource, nodeLabelExpr, params);
amtype = "stream"; amtype = "stream";
allStreams.addAll(containerList); allStreams.addAll(containerList);

View File

@ -104,6 +104,7 @@ public class SLSConfiguration {
public static final String JOB_START_MS = JOB_PREFIX + "start.ms"; public static final String JOB_START_MS = JOB_PREFIX + "start.ms";
public static final String JOB_END_MS = JOB_PREFIX + "end.ms"; public static final String JOB_END_MS = JOB_PREFIX + "end.ms";
public static final String JOB_QUEUE_NAME = JOB_PREFIX + "queue.name"; public static final String JOB_QUEUE_NAME = JOB_PREFIX + "queue.name";
public static final String JOB_LABEL_EXPR = JOB_PREFIX + "label.expression";
public static final String JOB_USER = JOB_PREFIX + "user"; public static final String JOB_USER = JOB_PREFIX + "user";
public static final String JOB_COUNT = JOB_PREFIX + "count"; public static final String JOB_COUNT = JOB_PREFIX + "count";
public static final String JOB_TASKS = JOB_PREFIX + "tasks"; public static final String JOB_TASKS = JOB_PREFIX + "tasks";

View File

@ -23,6 +23,7 @@ import java.util.ArrayList;
import java.util.Collections; import java.util.Collections;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.DelayQueue; import java.util.concurrent.DelayQueue;
@ -35,6 +36,7 @@ import org.apache.hadoop.yarn.api.records.ContainerExitStatus;
import org.apache.hadoop.yarn.api.records.ContainerId; import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.ContainerState; import org.apache.hadoop.yarn.api.records.ContainerState;
import org.apache.hadoop.yarn.api.records.ContainerStatus; import org.apache.hadoop.yarn.api.records.ContainerStatus;
import org.apache.hadoop.yarn.api.records.NodeLabel;
import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.api.records.ResourceUtilization; import org.apache.hadoop.yarn.api.records.ResourceUtilization;
import org.apache.hadoop.yarn.exceptions.YarnException; import org.apache.hadoop.yarn.exceptions.YarnException;
@ -78,7 +80,7 @@ public class NMSimulator extends TaskRunner.Task {
public void init(String nodeIdStr, Resource nodeResource, int dispatchTime, public void init(String nodeIdStr, Resource nodeResource, int dispatchTime,
int heartBeatInterval, ResourceManager pRm, int heartBeatInterval, ResourceManager pRm,
float pResourceUtilizationRatio) float pResourceUtilizationRatio, Set<NodeLabel> labels)
throws IOException, YarnException { throws IOException, YarnException {
super.init(dispatchTime, dispatchTime + 1000000L * heartBeatInterval, super.init(dispatchTime, dispatchTime + 1000000L * heartBeatInterval,
heartBeatInterval); heartBeatInterval);
@ -102,6 +104,7 @@ public class NMSimulator extends TaskRunner.Task {
Records.newRecord(RegisterNodeManagerRequest.class); Records.newRecord(RegisterNodeManagerRequest.class);
req.setNodeId(node.getNodeID()); req.setNodeId(node.getNodeID());
req.setResource(node.getTotalCapability()); req.setResource(node.getTotalCapability());
req.setNodeLabels(labels);
req.setHttpPort(80); req.setHttpPort(80);
RegisterNodeManagerResponse response = this.rm.getResourceTrackerService() RegisterNodeManagerResponse response = this.rm.getResourceTrackerService()
.registerNodeManager(req); .registerNodeManager(req);
@ -109,6 +112,14 @@ public class NMSimulator extends TaskRunner.Task {
this.resourceUtilizationRatio = pResourceUtilizationRatio; this.resourceUtilizationRatio = pResourceUtilizationRatio;
} }
public void init(String nodeIdStr, Resource nodeResource, int dispatchTime,
int heartBeatInterval, ResourceManager pRm,
float pResourceUtilizationRatio)
throws IOException, YarnException {
init(nodeIdStr, nodeResource, dispatchTime, heartBeatInterval, pRm,
pResourceUtilizationRatio, null);
}
@Override @Override
public void firstStep() { public void firstStep() {
// do nothing // do nothing

View File

@ -23,7 +23,6 @@ import java.io.IOException;
import java.io.InputStreamReader; import java.io.InputStreamReader;
import java.io.Reader; import java.io.Reader;
import java.util.HashMap;
import java.util.HashSet; import java.util.HashSet;
import java.util.Iterator; import java.util.Iterator;
import java.util.List; import java.util.List;
@ -41,8 +40,11 @@ import org.apache.hadoop.tools.rumen.JobTraceReader;
import org.apache.hadoop.tools.rumen.LoggedJob; import org.apache.hadoop.tools.rumen.LoggedJob;
import org.apache.hadoop.tools.rumen.LoggedTask; import org.apache.hadoop.tools.rumen.LoggedTask;
import org.apache.hadoop.tools.rumen.LoggedTaskAttempt; import org.apache.hadoop.tools.rumen.LoggedTaskAttempt;
import org.apache.hadoop.yarn.api.records.NodeLabel;
import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.api.records.ResourceInformation; import org.apache.hadoop.yarn.api.records.ResourceInformation;
import org.apache.hadoop.yarn.client.util.YarnClientUtils;
import org.apache.hadoop.yarn.sls.SLSRunner.NodeDetails;
import org.apache.hadoop.yarn.sls.conf.SLSConfiguration; import org.apache.hadoop.yarn.sls.conf.SLSConfiguration;
import org.apache.hadoop.yarn.util.resource.ResourceUtils; import org.apache.hadoop.yarn.util.resource.ResourceUtils;
import org.apache.hadoop.yarn.util.resource.Resources; import org.apache.hadoop.yarn.util.resource.Resources;
@ -52,6 +54,10 @@ import org.apache.hadoop.yarn.util.resource.Resources;
public class SLSUtils { public class SLSUtils {
public final static String DEFAULT_JOB_TYPE = "mapreduce"; public final static String DEFAULT_JOB_TYPE = "mapreduce";
private static final String LABEL_FORMAT_ERR_MSG =
"Input format for adding node-labels is not correct, it should be "
+ "labelName1[(exclusive=true/false)],labelName2[] ..";
// hostname includes the network path and the host name. for example // hostname includes the network path and the host name. for example
// "/default-rack/hostFoo" or "/coreSwitchA/TORSwitchB/hostBar". // "/default-rack/hostFoo" or "/coreSwitchA/TORSwitchB/hostBar".
// the function returns two Strings, the first element is the network // the function returns two Strings, the first element is the network
@ -66,9 +72,9 @@ public class SLSUtils {
/** /**
* parse the rumen trace file, return each host name * parse the rumen trace file, return each host name
*/ */
public static Set<String> parseNodesFromRumenTrace(String jobTrace) public static Set<NodeDetails> parseNodesFromRumenTrace(
throws IOException { String jobTrace) throws IOException {
Set<String> nodeSet = new HashSet<String>(); Set<NodeDetails> nodeSet = new HashSet<>();
File fin = new File(jobTrace); File fin = new File(jobTrace);
Configuration conf = new Configuration(); Configuration conf = new Configuration();
@ -85,7 +91,8 @@ public class SLSUtils {
} }
LoggedTaskAttempt taskAttempt = mapTask.getAttempts() LoggedTaskAttempt taskAttempt = mapTask.getAttempts()
.get(mapTask.getAttempts().size() - 1); .get(mapTask.getAttempts().size() - 1);
nodeSet.add(taskAttempt.getHostName().getValue()); nodeSet.add(new NodeDetails(
taskAttempt.getHostName().getValue()));
} }
for(LoggedTask reduceTask : job.getReduceTasks()) { for(LoggedTask reduceTask : job.getReduceTasks()) {
if (reduceTask.getAttempts().size() == 0) { if (reduceTask.getAttempts().size() == 0) {
@ -93,7 +100,8 @@ public class SLSUtils {
} }
LoggedTaskAttempt taskAttempt = reduceTask.getAttempts() LoggedTaskAttempt taskAttempt = reduceTask.getAttempts()
.get(reduceTask.getAttempts().size() - 1); .get(reduceTask.getAttempts().size() - 1);
nodeSet.add(taskAttempt.getHostName().getValue()); nodeSet.add(new NodeDetails(
taskAttempt.getHostName().getValue()));
} }
} }
} finally { } finally {
@ -106,9 +114,9 @@ public class SLSUtils {
/** /**
* parse the sls trace file, return each host name * parse the sls trace file, return each host name
*/ */
public static Set<String> parseNodesFromSLSTrace(String jobTrace) public static Set<NodeDetails> parseNodesFromSLSTrace(
throws IOException { String jobTrace) throws IOException {
Set<String> nodeSet = new HashSet<>(); Set<NodeDetails> nodeSet = new HashSet<>();
JsonFactory jsonF = new JsonFactory(); JsonFactory jsonF = new JsonFactory();
ObjectMapper mapper = new ObjectMapper(); ObjectMapper mapper = new ObjectMapper();
Reader input = Reader input =
@ -124,7 +132,8 @@ public class SLSUtils {
return nodeSet; return nodeSet;
} }
private static void addNodes(Set<String> nodeSet, Map jsonEntry) { private static void addNodes(Set<NodeDetails> nodeSet,
Map jsonEntry) {
if (jsonEntry.containsKey(SLSConfiguration.NUM_NODES)) { if (jsonEntry.containsKey(SLSConfiguration.NUM_NODES)) {
int numNodes = Integer.parseInt( int numNodes = Integer.parseInt(
jsonEntry.get(SLSConfiguration.NUM_NODES).toString()); jsonEntry.get(SLSConfiguration.NUM_NODES).toString());
@ -142,7 +151,7 @@ public class SLSUtils {
Map jsonTask = (Map) o; Map jsonTask = (Map) o;
String hostname = (String) jsonTask.get(SLSConfiguration.TASK_HOST); String hostname = (String) jsonTask.get(SLSConfiguration.TASK_HOST);
if (hostname != null) { if (hostname != null) {
nodeSet.add(hostname); nodeSet.add(new NodeDetails(hostname));
} }
} }
} }
@ -150,10 +159,11 @@ public class SLSUtils {
/** /**
* parse the input node file, return each host name * parse the input node file, return each host name
* sample input: label1(exclusive=true),label2(exclusive=false),label3
*/ */
public static Map<String, Resource> parseNodesFromNodeFile(String nodeFile, public static Set<NodeDetails> parseNodesFromNodeFile(
Resource nmDefaultResource) throws IOException { String nodeFile, Resource nmDefaultResource) throws IOException {
Map<String, Resource> nodeResourceMap = new HashMap<>(); Set<NodeDetails> nodeSet = new HashSet<>();
JsonFactory jsonF = new JsonFactory(); JsonFactory jsonF = new JsonFactory();
ObjectMapper mapper = new ObjectMapper(); ObjectMapper mapper = new ObjectMapper();
Reader input = Reader input =
@ -166,6 +176,8 @@ public class SLSUtils {
List tasks = (List) jsonE.get("nodes"); List tasks = (List) jsonE.get("nodes");
for (Object o : tasks) { for (Object o : tasks) {
Map jsonNode = (Map) o; Map jsonNode = (Map) o;
NodeDetails nodeDetails = new NodeDetails(
rack + "/" + jsonNode.get("node"));
Resource nodeResource = Resources.clone(nmDefaultResource); Resource nodeResource = Resources.clone(nmDefaultResource);
ResourceInformation[] infors = ResourceUtils.getResourceTypesArray(); ResourceInformation[] infors = ResourceUtils.getResourceTypesArray();
for (ResourceInformation info : infors) { for (ResourceInformation info : infors) {
@ -174,18 +186,25 @@ public class SLSUtils {
Integer.parseInt(jsonNode.get(info.getName()).toString())); Integer.parseInt(jsonNode.get(info.getName()).toString()));
} }
} }
nodeResourceMap.put(rack + "/" + jsonNode.get("node"), nodeResource); nodeDetails.setNodeResource(nodeResource);
if (jsonNode.get("labels") != null) {
Set<NodeLabel> nodeLabels = new HashSet<>(
YarnClientUtils.buildNodeLabelsFromStr(
jsonNode.get("labels").toString()));
nodeDetails.setLabels(nodeLabels);
}
nodeSet.add(nodeDetails);
} }
} }
} finally { } finally {
input.close(); input.close();
} }
return nodeResourceMap; return nodeSet;
} }
public static Set<? extends String> generateNodes(int numNodes, public static Set<NodeDetails> generateNodes(int numNodes,
int numRacks){ int numRacks){
Set<String> nodeSet = new HashSet<>(); Set<NodeDetails> nodeSet = new HashSet<>();
if (numRacks < 1) { if (numRacks < 1) {
numRacks = 1; numRacks = 1;
} }
@ -195,7 +214,8 @@ public class SLSUtils {
} }
for (int i = 0; i < numNodes; i++) { for (int i = 0; i < numNodes; i++) {
nodeSet.add("/rack" + i % numRacks + "/node" + i); nodeSet.add(new NodeDetails(
"/rack" + i % numRacks + "/node" + i));
} }
return nodeSet; return nodeSet;
} }

View File

@ -19,10 +19,13 @@ package org.apache.hadoop.yarn.sls.appmaster;
import com.codahale.metrics.MetricRegistry; import com.codahale.metrics.MetricRegistry;
import org.apache.commons.io.FileUtils; import org.apache.commons.io.FileUtils;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.ReservationId; import org.apache.hadoop.yarn.api.records.ReservationId;
import org.apache.hadoop.yarn.client.cli.RMAdminCLI;
import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.exceptions.YarnException; import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager; import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.FairScheduler; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.FairScheduler;
import org.apache.hadoop.yarn.sls.conf.SLSConfiguration; import org.apache.hadoop.yarn.sls.conf.SLSConfiguration;
@ -42,6 +45,7 @@ import java.util.ArrayList;
import java.util.Arrays; import java.util.Arrays;
import java.util.Collection; import java.util.Collection;
import java.util.List; import java.util.List;
import java.util.concurrent.ConcurrentMap;
@RunWith(Parameterized.class) @RunWith(Parameterized.class)
public class TestAMSimulator { public class TestAMSimulator {
@ -73,6 +77,7 @@ public class TestAMSimulator {
conf.set(SLSConfiguration.METRICS_OUTPUT_DIR, metricOutputDir.toString()); conf.set(SLSConfiguration.METRICS_OUTPUT_DIR, metricOutputDir.toString());
conf.set(YarnConfiguration.RM_SCHEDULER, slsScheduler.getName()); conf.set(YarnConfiguration.RM_SCHEDULER, slsScheduler.getName());
conf.set(SLSConfiguration.RM_SCHEDULER, scheduler.getName()); conf.set(SLSConfiguration.RM_SCHEDULER, scheduler.getName());
conf.set(YarnConfiguration.NODE_LABELS_ENABLED, "true");
conf.setBoolean(SLSConfiguration.METRICS_SWITCH, true); conf.setBoolean(SLSConfiguration.METRICS_SWITCH, true);
rm = new ResourceManager(); rm = new ResourceManager();
rm.init(conf); rm.init(conf);
@ -140,7 +145,7 @@ public class TestAMSimulator {
String queue = "default"; String queue = "default";
List<ContainerSimulator> containers = new ArrayList<>(); List<ContainerSimulator> containers = new ArrayList<>();
app.init(1000, containers, rm, null, 0, 1000000L, "user1", queue, true, app.init(1000, containers, rm, null, 0, 1000000L, "user1", queue, true,
appId, 0, SLSConfiguration.getAMContainerResource(conf), null); appId, 0, SLSConfiguration.getAMContainerResource(conf), null, null);
app.firstStep(); app.firstStep();
verifySchedulerMetrics(appId); verifySchedulerMetrics(appId);
@ -152,6 +157,34 @@ public class TestAMSimulator {
app.lastStep(); app.lastStep();
} }
@Test
public void testAMSimulatorWithNodeLabels() throws Exception {
if (scheduler.equals(CapacityScheduler.class)) {
// add label to the cluster
RMAdminCLI rmAdminCLI = new RMAdminCLI(conf);
String[] args = {"-addToClusterNodeLabels", "label1"};
rmAdminCLI.run(args);
MockAMSimulator app = new MockAMSimulator();
String appId = "app1";
String queue = "default";
List<ContainerSimulator> containers = new ArrayList<>();
app.init(1000, containers, rm, null, 0, 1000000L, "user1", queue, true,
appId, 0, SLSConfiguration.getAMContainerResource(conf),
"label1", null);
app.firstStep();
verifySchedulerMetrics(appId);
ConcurrentMap<ApplicationId, RMApp> rmApps =
rm.getRMContext().getRMApps();
Assert.assertEquals(1, rmApps.size());
RMApp rmApp = rmApps.get(app.appId);
Assert.assertNotNull(rmApp);
Assert.assertEquals("label1", rmApp.getAmNodeLabelExpression());
}
}
@After @After
public void tearDown() { public void tearDown() {
if (rm != null) { if (rm != null) {

View File

@ -18,13 +18,13 @@
package org.apache.hadoop.yarn.sls.utils; package org.apache.hadoop.yarn.sls.utils;
import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.api.records.NodeLabel;
import org.apache.hadoop.yarn.sls.SLSRunner.NodeDetails;
import org.apache.hadoop.yarn.util.resource.Resources; import org.apache.hadoop.yarn.util.resource.Resources;
import org.junit.Assert; import org.junit.Assert;
import org.junit.Test; import org.junit.Test;
import java.util.HashSet; import java.util.HashSet;
import java.util.Map;
import java.util.Set; import java.util.Set;
public class TestSLSUtils { public class TestSLSUtils {
@ -45,28 +45,54 @@ public class TestSLSUtils {
@Test @Test
public void testParseNodesFromNodeFile() throws Exception { public void testParseNodesFromNodeFile() throws Exception {
String nodeFile = "src/test/resources/nodes.json"; String nodeFile = "src/test/resources/nodes.json";
Map<String, Resource> nodeResourceMap = SLSUtils.parseNodesFromNodeFile( Set<NodeDetails> nodeDetails = SLSUtils.parseNodesFromNodeFile(
nodeFile, Resources.createResource(1024, 2)); nodeFile, Resources.createResource(1024, 2));
Assert.assertEquals(20, nodeResourceMap.size()); Assert.assertEquals(20, nodeDetails.size());
nodeFile = "src/test/resources/nodes-with-resources.json"; nodeFile = "src/test/resources/nodes-with-resources.json";
nodeResourceMap = SLSUtils.parseNodesFromNodeFile( nodeDetails = SLSUtils.parseNodesFromNodeFile(
nodeFile, Resources.createResource(1024, 2)); nodeFile, Resources.createResource(1024, 2));
Assert.assertEquals(4, Assert.assertEquals(4, nodeDetails.size());
nodeResourceMap.size()); for (NodeDetails nodeDetail : nodeDetails) {
Assert.assertEquals(2048, if (nodeDetail.getHostname().equals("/rack1/node1")) {
nodeResourceMap.get("/rack1/node1").getMemorySize()); Assert.assertEquals(2048,
Assert.assertEquals(6, nodeDetail.getNodeResource().getMemorySize());
nodeResourceMap.get("/rack1/node1").getVirtualCores()); Assert.assertEquals(6,
Assert.assertEquals(1024, nodeDetail.getNodeResource().getVirtualCores());
nodeResourceMap.get("/rack1/node2").getMemorySize()); } else if (nodeDetail.getHostname().equals("/rack1/node2")) {
Assert.assertEquals(2, Assert.assertEquals(1024,
nodeResourceMap.get("/rack1/node2").getVirtualCores()); nodeDetail.getNodeResource().getMemorySize());
Assert.assertEquals(2,
nodeDetail.getNodeResource().getVirtualCores());
Assert.assertNull(nodeDetail.getLabels());
} else if (nodeDetail.getHostname().equals("/rack1/node3")) {
Assert.assertEquals(1024,
nodeDetail.getNodeResource().getMemorySize());
Assert.assertEquals(2,
nodeDetail.getNodeResource().getVirtualCores());
Assert.assertEquals(2, nodeDetail.getLabels().size());
for (NodeLabel nodeLabel : nodeDetail.getLabels()) {
if (nodeLabel.getName().equals("label1")) {
Assert.assertTrue(nodeLabel.isExclusive());
} else if(nodeLabel.getName().equals("label2")) {
Assert.assertFalse(nodeLabel.isExclusive());
} else {
Assert.assertTrue("Unexepected label", false);
}
}
} else if (nodeDetail.getHostname().equals("/rack1/node4")) {
Assert.assertEquals(6144,
nodeDetail.getNodeResource().getMemorySize());
Assert.assertEquals(12,
nodeDetail.getNodeResource().getVirtualCores());
Assert.assertEquals(2, nodeDetail.getLabels().size());
}
}
} }
@Test @Test
public void testGenerateNodes() { public void testGenerateNodes() {
Set<? extends String> nodes = SLSUtils.generateNodes(3, 3); Set<NodeDetails> nodes = SLSUtils.generateNodes(3, 3);
Assert.assertEquals("Number of nodes is wrong.", 3, nodes.size()); Assert.assertEquals("Number of nodes is wrong.", 3, nodes.size());
Assert.assertEquals("Number of racks is wrong.", 3, getNumRack(nodes)); Assert.assertEquals("Number of racks is wrong.", 3, getNumRack(nodes));
@ -83,10 +109,10 @@ public class TestSLSUtils {
Assert.assertEquals("Number of racks is wrong.", 1, getNumRack(nodes)); Assert.assertEquals("Number of racks is wrong.", 1, getNumRack(nodes));
} }
private int getNumRack(Set<? extends String> nodes) { private int getNumRack(Set<NodeDetails> nodes) {
Set<String> racks = new HashSet<>(); Set<String> racks = new HashSet<>();
for (String node : nodes) { for (NodeDetails node : nodes) {
String[] rackHostname = SLSUtils.getRackHostName(node); String[] rackHostname = SLSUtils.getRackHostName(node.getHostname());
racks.add(rackHostname[0]); racks.add(rackHostname[0]);
} }
return racks.size(); return racks.size();

View File

@ -10,10 +10,14 @@
"node": "node2" "node": "node2"
}, },
{ {
"node": "node3" "node": "node3",
"labels": "label1, label2(exclusive=false)"
}, },
{ {
"node": "node4" "node": "node4",
"labels": "label1, label2(exclusive=false)",
"memory-mb" : 6144,
"vcores" : 12
} }
] ]
} }

View File

@ -20,7 +20,6 @@ package org.apache.hadoop.yarn.client.cli;
import java.io.IOException; import java.io.IOException;
import java.io.PrintStream; import java.io.PrintStream;
import java.util.ArrayList;
import java.util.Arrays; import java.util.Arrays;
import java.util.Collection; import java.util.Collection;
import java.util.HashMap; import java.util.HashMap;
@ -54,6 +53,7 @@ import org.apache.hadoop.yarn.api.records.ResourceInformation;
import org.apache.hadoop.yarn.api.records.ResourceOption; import org.apache.hadoop.yarn.api.records.ResourceOption;
import org.apache.hadoop.yarn.client.ClientRMProxy; import org.apache.hadoop.yarn.client.ClientRMProxy;
import org.apache.hadoop.yarn.client.RMHAServiceTarget; import org.apache.hadoop.yarn.client.RMHAServiceTarget;
import org.apache.hadoop.yarn.client.util.YarnClientUtils;
import org.apache.hadoop.yarn.conf.HAUtil; import org.apache.hadoop.yarn.conf.HAUtil;
import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.exceptions.YarnException; import org.apache.hadoop.yarn.exceptions.YarnException;
@ -82,7 +82,8 @@ import org.apache.hadoop.yarn.util.resource.Resources;
import com.google.common.base.Preconditions; import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableMap; import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableSet;
import static org.apache.hadoop.yarn.client.util.YarnClientUtils.NO_LABEL_ERR_MSG;
@Private @Private
@Unstable @Unstable
@ -91,15 +92,10 @@ public class RMAdminCLI extends HAAdmin {
private final RecordFactory recordFactory = private final RecordFactory recordFactory =
RecordFactoryProvider.getRecordFactory(null); RecordFactoryProvider.getRecordFactory(null);
static CommonNodeLabelsManager localNodeLabelsManager = null; static CommonNodeLabelsManager localNodeLabelsManager = null;
private static final String NO_LABEL_ERR_MSG =
"No cluster node-labels are specified";
private static final String NO_MAPPING_ERR_MSG = private static final String NO_MAPPING_ERR_MSG =
"No node-to-labels mappings are specified"; "No node-to-labels mappings are specified";
private static final String INVALID_TIMEOUT_ERR_MSG = private static final String INVALID_TIMEOUT_ERR_MSG =
"Invalid timeout specified : "; "Invalid timeout specified : ";
private static final String ADD_LABEL_FORMAT_ERR_MSG =
"Input format for adding node-labels is not correct, it should be "
+ "labelName1[(exclusive=true/false)],LabelName2[] ..";
private static final Pattern RESOURCE_TYPES_ARGS_PATTERN = private static final Pattern RESOURCE_TYPES_ARGS_PATTERN =
Pattern.compile("^[0-9]*$"); Pattern.compile("^[0-9]*$");
@ -533,65 +529,6 @@ public class RMAdminCLI extends HAAdmin {
} }
return localNodeLabelsManager; return localNodeLabelsManager;
} }
private List<NodeLabel> buildNodeLabelsFromStr(String args) {
List<NodeLabel> nodeLabels = new ArrayList<>();
for (String p : args.split(",")) {
if (!p.trim().isEmpty()) {
String labelName = p;
// Try to parse exclusive
boolean exclusive = NodeLabel.DEFAULT_NODE_LABEL_EXCLUSIVITY;
int leftParenthesisIdx = p.indexOf("(");
int rightParenthesisIdx = p.indexOf(")");
if ((leftParenthesisIdx == -1 && rightParenthesisIdx != -1)
|| (leftParenthesisIdx != -1 && rightParenthesisIdx == -1)) {
// Parenthese not match
throw new IllegalArgumentException(ADD_LABEL_FORMAT_ERR_MSG);
}
if (leftParenthesisIdx > 0 && rightParenthesisIdx > 0) {
if (leftParenthesisIdx > rightParenthesisIdx) {
// Parentese not match
throw new IllegalArgumentException(ADD_LABEL_FORMAT_ERR_MSG);
}
String property = p.substring(p.indexOf("(") + 1, p.indexOf(")"));
if (property.contains("=")) {
String key = property.substring(0, property.indexOf("=")).trim();
String value =
property
.substring(property.indexOf("=") + 1, property.length())
.trim();
// Now we only support one property, which is exclusive, so check if
// key = exclusive and value = {true/false}
if (key.equals("exclusive")
&& ImmutableSet.of("true", "false").contains(value)) {
exclusive = Boolean.parseBoolean(value);
} else {
throw new IllegalArgumentException(ADD_LABEL_FORMAT_ERR_MSG);
}
} else if (!property.trim().isEmpty()) {
throw new IllegalArgumentException(ADD_LABEL_FORMAT_ERR_MSG);
}
}
// Try to get labelName if there's "(..)"
if (labelName.contains("(")) {
labelName = labelName.substring(0, labelName.indexOf("(")).trim();
}
nodeLabels.add(NodeLabel.newInstance(labelName, exclusive));
}
}
if (nodeLabels.isEmpty()) {
throw new IllegalArgumentException(NO_LABEL_ERR_MSG);
}
return nodeLabels;
}
private Set<String> buildNodeLabelNamesFromStr(String args) { private Set<String> buildNodeLabelNamesFromStr(String args) {
Set<String> labels = new HashSet<String>(); Set<String> labels = new HashSet<String>();
@ -624,7 +561,7 @@ public class RMAdminCLI extends HAAdmin {
return exitCode; return exitCode;
} }
List<NodeLabel> labels = buildNodeLabelsFromStr( List<NodeLabel> labels = YarnClientUtils.buildNodeLabelsFromStr(
cliParser.getOptionValue("addToClusterNodeLabels")); cliParser.getOptionValue("addToClusterNodeLabels"));
if (cliParser.hasOption("directlyAccessNodeLabelStore")) { if (cliParser.hasOption("directlyAccessNodeLabelStore")) {
getNodeLabelManagerInstance(getConf()).addToCluserNodeLabels(labels); getNodeLabelManagerInstance(getConf()).addToCluserNodeLabels(labels);

View File

@ -19,8 +19,13 @@ package org.apache.hadoop.yarn.client.util;
import com.google.common.annotations.VisibleForTesting; import com.google.common.annotations.VisibleForTesting;
import java.io.IOException; import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import com.google.common.collect.ImmutableSet;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.security.SecurityUtil; import org.apache.hadoop.security.SecurityUtil;
import org.apache.hadoop.yarn.api.records.NodeLabel;
import org.apache.hadoop.yarn.conf.HAUtil; import org.apache.hadoop.yarn.conf.HAUtil;
import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.conf.YarnConfiguration;
@ -29,6 +34,14 @@ import org.apache.hadoop.yarn.conf.YarnConfiguration;
* YARN clients. * YARN clients.
*/ */
public abstract class YarnClientUtils { public abstract class YarnClientUtils {
private static final String ADD_LABEL_FORMAT_ERR_MSG =
"Input format for adding node-labels is not correct, it should be "
+ "labelName1[(exclusive=true/false)],LabelName2[] ..";
public static final String NO_LABEL_ERR_MSG =
"No cluster node-labels are specified";
/** /**
* Look up and return the resource manager's principal. This method * Look up and return the resource manager's principal. This method
* automatically does the <code>_HOST</code> replacement in the principal and * automatically does the <code>_HOST</code> replacement in the principal and
@ -79,6 +92,70 @@ public abstract class YarnClientUtils {
return SecurityUtil.getServerPrincipal(rmPrincipal, hostname); return SecurityUtil.getServerPrincipal(rmPrincipal, hostname);
} }
/**
* Creates node labels from string
* @param args nodelabels string to be parsed
* @return list of node labels
*/
public static List<NodeLabel> buildNodeLabelsFromStr(String args) {
List<NodeLabel> nodeLabels = new ArrayList<>();
for (String p : args.split(",")) {
if (!p.trim().isEmpty()) {
String labelName = p;
// Try to parse exclusive
boolean exclusive = NodeLabel.DEFAULT_NODE_LABEL_EXCLUSIVITY;
int leftParenthesisIdx = p.indexOf("(");
int rightParenthesisIdx = p.indexOf(")");
if ((leftParenthesisIdx == -1 && rightParenthesisIdx != -1)
|| (leftParenthesisIdx != -1 && rightParenthesisIdx == -1)) {
// Parentheses not match
throw new IllegalArgumentException(ADD_LABEL_FORMAT_ERR_MSG);
}
if (leftParenthesisIdx > 0 && rightParenthesisIdx > 0) {
if (leftParenthesisIdx > rightParenthesisIdx) {
// Parentheses not match
throw new IllegalArgumentException(ADD_LABEL_FORMAT_ERR_MSG);
}
String property = p.substring(p.indexOf("(") + 1, p.indexOf(")"));
if (property.contains("=")) {
String key = property.substring(0, property.indexOf("=")).trim();
String value =
property
.substring(property.indexOf("=") + 1, property.length())
.trim();
// Now we only support one property, which is exclusive, so check if
// key = exclusive and value = {true/false}
if (key.equals("exclusive")
&& ImmutableSet.of("true", "false").contains(value)) {
exclusive = Boolean.parseBoolean(value);
} else {
throw new IllegalArgumentException(ADD_LABEL_FORMAT_ERR_MSG);
}
} else if (!property.trim().isEmpty()) {
throw new IllegalArgumentException(ADD_LABEL_FORMAT_ERR_MSG);
}
}
// Try to get labelName if there's "(..)"
if (labelName.contains("(")) {
labelName = labelName.substring(0, labelName.indexOf("(")).trim();
}
nodeLabels.add(NodeLabel.newInstance(labelName, exclusive));
}
}
if (nodeLabels.isEmpty()) {
throw new IllegalArgumentException(NO_LABEL_ERR_MSG);
}
return nodeLabels;
}
/** /**
* Returns a {@link YarnConfiguration} built from the {@code conf} parameter * Returns a {@link YarnConfiguration} built from the {@code conf} parameter
* that is guaranteed to have the {@link YarnConfiguration#RM_HA_ID} * that is guaranteed to have the {@link YarnConfiguration#RM_HA_ID}