YARN-7413. Support resource type in SLS (Contributed by Yufei Gu via Daniel Templeton)

Change-Id: Ic0a897c123c5d2f57aae757ca6bcf1dad7b90d2b
(cherry picked from commit ba8136615a)
This commit is contained in:
Daniel Templeton 2017-11-09 12:09:48 -08:00 committed by Yufei Gu
parent ddef7d05a4
commit 38f5d4cbba
5 changed files with 117 additions and 41 deletions

View File

@ -62,6 +62,7 @@ import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.hadoop.yarn.api.records.NodeState; import org.apache.hadoop.yarn.api.records.NodeState;
import org.apache.hadoop.yarn.api.records.ReservationId; import org.apache.hadoop.yarn.api.records.ReservationId;
import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.api.records.ResourceInformation;
import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.exceptions.YarnException; import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager; import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager;
@ -84,6 +85,7 @@ import org.apache.hadoop.yarn.sls.synthetic.SynthJob;
import org.apache.hadoop.yarn.sls.synthetic.SynthTraceJobProducer; import org.apache.hadoop.yarn.sls.synthetic.SynthTraceJobProducer;
import org.apache.hadoop.yarn.sls.utils.SLSUtils; import org.apache.hadoop.yarn.sls.utils.SLSUtils;
import org.apache.hadoop.yarn.util.UTCClock; import org.apache.hadoop.yarn.util.UTCClock;
import org.apache.hadoop.yarn.util.resource.ResourceUtils;
import org.apache.hadoop.yarn.util.resource.Resources; import org.apache.hadoop.yarn.util.resource.Resources;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
@ -99,7 +101,7 @@ public class SLSRunner extends Configured implements Tool {
// NM simulator // NM simulator
private HashMap<NodeId, NMSimulator> nmMap; private HashMap<NodeId, NMSimulator> nmMap;
private int nmMemoryMB, nmVCores; private Resource nodeManagerResource;
private String nodeFile; private String nodeFile;
// AM simulator // AM simulator
@ -178,6 +180,30 @@ public class SLSRunner extends Configured implements Tool {
amClassMap.put(amType, Class.forName(tempConf.get(key))); amClassMap.put(amType, Class.forName(tempConf.get(key)));
} }
} }
nodeManagerResource = getNodeManagerResource();
}
private Resource getNodeManagerResource() {
Resource resource = Resources.createResource(0);
ResourceInformation[] infors = ResourceUtils.getResourceTypesArray();
for (ResourceInformation info : infors) {
long value;
if (info.getName().equals(ResourceInformation.MEMORY_URI)) {
value = getConf().getInt(SLSConfiguration.NM_MEMORY_MB,
SLSConfiguration.NM_MEMORY_MB_DEFAULT);
} else if (info.getName().equals(ResourceInformation.VCORES_URI)) {
value = getConf().getInt(SLSConfiguration.NM_VCORES,
SLSConfiguration.NM_VCORES_DEFAULT);
} else {
value = getConf().getLong(SLSConfiguration.NM_PREFIX +
info.getName(), SLSConfiguration.NM_RESOURCE_DEFAULT);
}
resource.setResourceValue(info.getName(), value);
}
return resource;
} }
/** /**
@ -261,10 +287,6 @@ public class SLSRunner extends Configured implements Tool {
private void startNM() throws YarnException, IOException { private void startNM() throws YarnException, IOException {
// nm configuration // nm configuration
nmMemoryMB = getConf().getInt(SLSConfiguration.NM_MEMORY_MB,
SLSConfiguration.NM_MEMORY_MB_DEFAULT);
nmVCores = getConf().getInt(SLSConfiguration.NM_VCORES,
SLSConfiguration.NM_VCORES_DEFAULT);
int heartbeatInterval = int heartbeatInterval =
getConf().getInt(SLSConfiguration.NM_HEARTBEAT_INTERVAL_MS, getConf().getInt(SLSConfiguration.NM_HEARTBEAT_INTERVAL_MS,
SLSConfiguration.NM_HEARTBEAT_INTERVAL_MS_DEFAULT); SLSConfiguration.NM_HEARTBEAT_INTERVAL_MS_DEFAULT);
@ -304,7 +326,7 @@ public class SLSRunner extends Configured implements Tool {
for (String hostName : nodeSet) { for (String hostName : nodeSet) {
// we randomize the heartbeat start time from zero to 1 interval // we randomize the heartbeat start time from zero to 1 interval
NMSimulator nm = new NMSimulator(); NMSimulator nm = new NMSimulator();
nm.init(hostName, nmMemoryMB, nmVCores, random.nextInt(heartbeatInterval), nm.init(hostName, nodeManagerResource, random.nextInt(heartbeatInterval),
heartbeatInterval, rm); heartbeatInterval, rm);
nmMap.put(nm.getNode().getNodeID(), nm); nmMap.put(nm.getNode().getNodeID(), nm);
runner.schedule(nm); runner.schedule(nm);
@ -460,18 +482,7 @@ public class SLSRunner extends Configured implements Tool {
+ " to 0!"); + " to 0!");
} }
Resource res = getDefaultContainerResource(); Resource res = getResourceForContainer(jsonTask);
if (jsonTask.containsKey(SLSConfiguration.TASK_MEMORY)) {
int containerMemory = Integer.parseInt(
jsonTask.get(SLSConfiguration.TASK_MEMORY).toString());
res.setMemorySize(containerMemory);
}
if (jsonTask.containsKey(SLSConfiguration.CONTAINER_VCORES)) {
int containerVCores = Integer.parseInt(
jsonTask.get(SLSConfiguration.CONTAINER_VCORES).toString());
res.setVirtualCores(containerVCores);
}
int priority = DEFAULT_MAPPER_PRIORITY; int priority = DEFAULT_MAPPER_PRIORITY;
if (jsonTask.containsKey(SLSConfiguration.TASK_PRIORITY)) { if (jsonTask.containsKey(SLSConfiguration.TASK_PRIORITY)) {
@ -500,6 +511,21 @@ public class SLSRunner extends Configured implements Tool {
return containers; return containers;
} }
private Resource getResourceForContainer(Map jsonTask) {
Resource res = getDefaultContainerResource();
ResourceInformation[] infors = ResourceUtils.getResourceTypesArray();
for (ResourceInformation info : infors) {
if (jsonTask.containsKey(SLSConfiguration.TASK_PREFIX + info.getName())) {
long value = Long.parseLong(
jsonTask.get(SLSConfiguration.TASK_PREFIX + info.getName())
.toString());
res.setResourceValue(info.getName(), value);
}
}
return res;
}
/** /**
* Parse workload from a rumen trace file. * Parse workload from a rumen trace file.
*/ */
@ -717,15 +743,15 @@ public class SLSRunner extends Configured implements Tool {
return amContainerResource; return amContainerResource;
} }
if (jsonJob.containsKey(SLSConfiguration.AM_MEMORY)) { ResourceInformation[] infors = ResourceUtils.getResourceTypesArray();
amContainerResource.setMemorySize( for (ResourceInformation info : infors) {
Long.parseLong(jsonJob.get(SLSConfiguration.AM_MEMORY).toString())); String key = SLSConfiguration.JOB_AM_PREFIX + info.getName();
if (jsonJob.containsKey(key)) {
long value = Long.parseLong(jsonJob.get(key).toString());
amContainerResource.setResourceValue(info.getName(), value);
}
} }
if (jsonJob.containsKey(SLSConfiguration.AM_VCORES)) {
amContainerResource.setVirtualCores(
Integer.parseInt(jsonJob.get(SLSConfiguration.AM_VCORES).toString()));
}
return amContainerResource; return amContainerResource;
} }
@ -777,8 +803,8 @@ public class SLSRunner extends Configured implements Tool {
// node // node
LOG.info("------------------------------------"); LOG.info("------------------------------------");
LOG.info("# nodes = {}, # racks = {}, capacity " + LOG.info("# nodes = {}, # racks = {}, capacity " +
"of each node {} MB memory and {} vcores.", "of each node {}.",
numNMs, numRacks, nmMemoryMB, nmVCores); numNMs, numRacks, nodeManagerResource);
LOG.info("------------------------------------"); LOG.info("------------------------------------");
// job // job
LOG.info("# applications = {}, # total " + LOG.info("# applications = {}, # total " +
@ -804,8 +830,10 @@ public class SLSRunner extends Configured implements Tool {
// package these information in the simulateInfoMap used by other places // package these information in the simulateInfoMap used by other places
simulateInfoMap.put("Number of racks", numRacks); simulateInfoMap.put("Number of racks", numRacks);
simulateInfoMap.put("Number of nodes", numNMs); simulateInfoMap.put("Number of nodes", numNMs);
simulateInfoMap.put("Node memory (MB)", nmMemoryMB); simulateInfoMap.put("Node memory (MB)",
simulateInfoMap.put("Node VCores", nmVCores); nodeManagerResource.getResourceValue(ResourceInformation.MEMORY_URI));
simulateInfoMap.put("Node VCores",
nodeManagerResource.getResourceValue(ResourceInformation.VCORES_URI));
simulateInfoMap.put("Number of applications", numAMs); simulateInfoMap.put("Number of applications", numAMs);
simulateInfoMap.put("Number of tasks", numTasks); simulateInfoMap.put("Number of tasks", numTasks);
simulateInfoMap.put("Average tasks per applicaion", simulateInfoMap.put("Average tasks per applicaion",

View File

@ -54,6 +54,7 @@ public class SLSConfiguration {
public static final int NM_MEMORY_MB_DEFAULT = 10240; public static final int NM_MEMORY_MB_DEFAULT = 10240;
public static final String NM_VCORES = NM_PREFIX + "vcores"; public static final String NM_VCORES = NM_PREFIX + "vcores";
public static final int NM_VCORES_DEFAULT = 10; public static final int NM_VCORES_DEFAULT = 10;
public static final int NM_RESOURCE_DEFAULT = 0;
public static final String NM_HEARTBEAT_INTERVAL_MS = NM_PREFIX public static final String NM_HEARTBEAT_INTERVAL_MS = NM_PREFIX
+ "heartbeat.interval.ms"; + "heartbeat.interval.ms";
public static final int NM_HEARTBEAT_INTERVAL_MS_DEFAULT = 1000; public static final int NM_HEARTBEAT_INTERVAL_MS_DEFAULT = 1000;
@ -65,12 +66,10 @@ public class SLSConfiguration {
public static final String AM_TYPE = AM_PREFIX + "type"; public static final String AM_TYPE = AM_PREFIX + "type";
public static final String AM_TYPE_PREFIX = AM_TYPE + "."; public static final String AM_TYPE_PREFIX = AM_TYPE + ".";
public static final String AM_MEMORY = AM_PREFIX + "memory";
public static final String AM_CONTAINER_MEMORY = AM_PREFIX + public static final String AM_CONTAINER_MEMORY = AM_PREFIX +
"container.memory"; "container.memory";
public static final int AM_CONTAINER_MEMORY_DEFAULT = 1024; public static final int AM_CONTAINER_MEMORY_DEFAULT = 1024;
public static final String AM_VCORES = AM_PREFIX + "vcores";
public static final String AM_CONTAINER_VCORES = AM_PREFIX + public static final String AM_CONTAINER_VCORES = AM_PREFIX +
"container.vcores"; "container.vcores";
public static final int AM_CONTAINER_VCORES_DEFAULT = 1; public static final int AM_CONTAINER_VCORES_DEFAULT = 1;
@ -104,8 +103,10 @@ public class SLSConfiguration {
public static final String JOB_USER = JOB_PREFIX + "user"; public static final String JOB_USER = JOB_PREFIX + "user";
public static final String JOB_COUNT = JOB_PREFIX + "count"; public static final String JOB_COUNT = JOB_PREFIX + "count";
public static final String JOB_TASKS = JOB_PREFIX + "tasks"; public static final String JOB_TASKS = JOB_PREFIX + "tasks";
public static final String JOB_AM_PREFIX = "am.";
// task // task
public static final String TASK_PREFIX = "container.";
public static final String COUNT = "count"; public static final String COUNT = "count";
public static final String TASK_CONTAINER = "container."; public static final String TASK_CONTAINER = "container.";
public static final String TASK_HOST = TASK_CONTAINER + "host"; public static final String TASK_HOST = TASK_CONTAINER + "host";
@ -115,6 +116,5 @@ public class SLSConfiguration {
public static final String TASK_DURATION_MS = TASK_CONTAINER + DURATION_MS; public static final String TASK_DURATION_MS = TASK_CONTAINER + DURATION_MS;
public static final String TASK_PRIORITY = TASK_CONTAINER + "priority"; public static final String TASK_PRIORITY = TASK_CONTAINER + "priority";
public static final String TASK_TYPE = TASK_CONTAINER + "type"; public static final String TASK_TYPE = TASK_CONTAINER + "type";
public static final String TASK_MEMORY = TASK_CONTAINER + "memory";
} }

View File

@ -35,24 +35,23 @@ import org.apache.hadoop.yarn.api.records.ContainerExitStatus;
import org.apache.hadoop.yarn.api.records.ContainerId; import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.ContainerState; import org.apache.hadoop.yarn.api.records.ContainerState;
import org.apache.hadoop.yarn.api.records.ContainerStatus; import org.apache.hadoop.yarn.api.records.ContainerStatus;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.exceptions.YarnException; import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.server.api.protocolrecords.NodeHeartbeatRequest; import org.apache.hadoop.yarn.server.api.protocolrecords.NodeHeartbeatRequest;
import org.apache.hadoop.yarn.server.api.protocolrecords.NodeHeartbeatResponse; import org.apache.hadoop.yarn.server.api.protocolrecords.NodeHeartbeatResponse;
import org.apache.hadoop.yarn.server.api.protocolrecords import org.apache.hadoop.yarn.server.api.protocolrecords.RegisterNodeManagerRequest;
.RegisterNodeManagerRequest; import org.apache.hadoop.yarn.server.api.protocolrecords.RegisterNodeManagerResponse;
import org.apache.hadoop.yarn.server.api.protocolrecords
.RegisterNodeManagerResponse;
import org.apache.hadoop.yarn.server.api.records.MasterKey; import org.apache.hadoop.yarn.server.api.records.MasterKey;
import org.apache.hadoop.yarn.server.api.records.NodeAction; import org.apache.hadoop.yarn.server.api.records.NodeAction;
import org.apache.hadoop.yarn.server.api.records.NodeHealthStatus; import org.apache.hadoop.yarn.server.api.records.NodeHealthStatus;
import org.apache.hadoop.yarn.server.api.records.NodeStatus; import org.apache.hadoop.yarn.server.api.records.NodeStatus;
import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager; import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager;
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode; import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
import org.apache.hadoop.yarn.server.utils.BuilderUtils;
import org.apache.hadoop.yarn.util.Records; import org.apache.hadoop.yarn.util.Records;
import org.apache.hadoop.yarn.sls.scheduler.ContainerSimulator; import org.apache.hadoop.yarn.sls.scheduler.ContainerSimulator;
import org.apache.hadoop.yarn.sls.scheduler.TaskRunner; import org.apache.hadoop.yarn.sls.scheduler.TaskRunner;
import org.apache.hadoop.yarn.sls.utils.SLSUtils; import org.apache.hadoop.yarn.sls.utils.SLSUtils;
import org.apache.hadoop.yarn.util.resource.Resources;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
@ -75,7 +74,7 @@ public class NMSimulator extends TaskRunner.Task {
private int RESPONSE_ID = 1; private int RESPONSE_ID = 1;
private final static Logger LOG = LoggerFactory.getLogger(NMSimulator.class); private final static Logger LOG = LoggerFactory.getLogger(NMSimulator.class);
public void init(String nodeIdStr, int memory, int cores, public void init(String nodeIdStr, Resource nodeResource,
int dispatchTime, int heartBeatInterval, ResourceManager rm) int dispatchTime, int heartBeatInterval, ResourceManager rm)
throws IOException, YarnException { throws IOException, YarnException {
super.init(dispatchTime, dispatchTime + 1000000L * heartBeatInterval, super.init(dispatchTime, dispatchTime + 1000000L * heartBeatInterval,
@ -83,7 +82,7 @@ public class NMSimulator extends TaskRunner.Task {
// create resource // create resource
String rackHostName[] = SLSUtils.getRackHostName(nodeIdStr); String rackHostName[] = SLSUtils.getRackHostName(nodeIdStr);
this.node = NodeInfo.newNodeInfo(rackHostName[0], rackHostName[1], this.node = NodeInfo.newNodeInfo(rackHostName[0], rackHostName[1],
BuilderUtils.newResource(memory, cores)); Resources.clone(nodeResource));
this.rm = rm; this.rm = rm;
// init data structures // init data structures
completedContainerList = completedContainerList =

View File

@ -315,6 +315,54 @@ The SYNTH mode of SLS is very convenient to generate very large loads without th
files. This allows to easily explore wide range of use cases (e.g., imagine simulating 100k jobs, and in different files. This allows to easily explore wide range of use cases (e.g., imagine simulating 100k jobs, and in different
runs simply tune the average number of mappers, or average task duration), in an efficient and compact way. runs simply tune the average number of mappers, or average task duration), in an efficient and compact way.
Resource Type in SLS
------------------------
This section talks about how to use resource type in SLS.
## Configure Resource Manager
This is the same to how to configure resource type for a real cluster. Configure
item `yarn.resource-types` in yarn-site.xml as the following example does.
<property>
<name>yarn.resource-types</name>
<value>resource-type1, resource-type2</value>
</property>
## Configure Node Manager
Specify the size of resource in each node by adding relevant items into
sls-runner.xml like the following example does. The values apply for every node
in SLS. The default values for resources other than memory and vcores are 0.
<property>
<name>yarn.sls.nm.resource-type1</name>
<value>10</value>
</property>
<property>
<name>yarn.sls.nm.resource-type2</name>
<value>10</value>
</property>
## Specify Resource in SLS JSON input
Resource Type is supported in SLS JSON input format, but not in other two
formats(SYNTH and RUMEN). To make it work in SLS JSON input format, you can
specify resource sizes for both task containers and the AM container. Here is an
example.
{
"job.start.ms" : 0,
"am.memory-mb": 2048,
"am.vcores": 2,
"am.resource-type1": 2,
"am.resource-type2": 2,
"job.tasks" : [ {
"container.duration.ms": 5000
"container.memory-mb": 1024,
"container.vcores": 1,
"container.resource-type1": 1,
"container.resource-type2": 1
}
}
Appendix Appendix
-------- --------

View File

@ -75,7 +75,8 @@ public class TestNMSimulator {
public void testNMSimulator() throws Exception { public void testNMSimulator() throws Exception {
// Register one node // Register one node
NMSimulator node1 = new NMSimulator(); NMSimulator node1 = new NMSimulator();
node1.init("/rack1/node1", GB * 10, 10, 0, 1000, rm); node1.init("/rack1/node1", Resources.createResource(GB * 10, 10), 0, 1000,
rm);
node1.middleStep(); node1.middleStep();
int numClusterNodes = rm.getResourceScheduler().getNumClusterNodes(); int numClusterNodes = rm.getResourceScheduler().getNumClusterNodes();