YARN-7413. Support resource type in SLS (Contributed by Yufei Gu via Daniel Templeton)
Change-Id: Ic0a897c123c5d2f57aae757ca6bcf1dad7b90d2b
This commit is contained in:
parent
462f6c490e
commit
ba8136615a
|
@ -62,6 +62,7 @@ import org.apache.hadoop.yarn.api.records.NodeId;
|
|||
import org.apache.hadoop.yarn.api.records.NodeState;
|
||||
import org.apache.hadoop.yarn.api.records.ReservationId;
|
||||
import org.apache.hadoop.yarn.api.records.Resource;
|
||||
import org.apache.hadoop.yarn.api.records.ResourceInformation;
|
||||
import org.apache.hadoop.yarn.conf.YarnConfiguration;
|
||||
import org.apache.hadoop.yarn.exceptions.YarnException;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager;
|
||||
|
@ -84,6 +85,7 @@ import org.apache.hadoop.yarn.sls.synthetic.SynthJob;
|
|||
import org.apache.hadoop.yarn.sls.synthetic.SynthTraceJobProducer;
|
||||
import org.apache.hadoop.yarn.sls.utils.SLSUtils;
|
||||
import org.apache.hadoop.yarn.util.UTCClock;
|
||||
import org.apache.hadoop.yarn.util.resource.ResourceUtils;
|
||||
import org.apache.hadoop.yarn.util.resource.Resources;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
@ -99,7 +101,7 @@ public class SLSRunner extends Configured implements Tool {
|
|||
|
||||
// NM simulator
|
||||
private HashMap<NodeId, NMSimulator> nmMap;
|
||||
private int nmMemoryMB, nmVCores;
|
||||
private Resource nodeManagerResource;
|
||||
private String nodeFile;
|
||||
|
||||
// AM simulator
|
||||
|
@ -178,6 +180,30 @@ public class SLSRunner extends Configured implements Tool {
|
|||
amClassMap.put(amType, Class.forName(tempConf.get(key)));
|
||||
}
|
||||
}
|
||||
|
||||
nodeManagerResource = getNodeManagerResource();
|
||||
}
|
||||
|
||||
private Resource getNodeManagerResource() {
|
||||
Resource resource = Resources.createResource(0);
|
||||
ResourceInformation[] infors = ResourceUtils.getResourceTypesArray();
|
||||
for (ResourceInformation info : infors) {
|
||||
long value;
|
||||
if (info.getName().equals(ResourceInformation.MEMORY_URI)) {
|
||||
value = getConf().getInt(SLSConfiguration.NM_MEMORY_MB,
|
||||
SLSConfiguration.NM_MEMORY_MB_DEFAULT);
|
||||
} else if (info.getName().equals(ResourceInformation.VCORES_URI)) {
|
||||
value = getConf().getInt(SLSConfiguration.NM_VCORES,
|
||||
SLSConfiguration.NM_VCORES_DEFAULT);
|
||||
} else {
|
||||
value = getConf().getLong(SLSConfiguration.NM_PREFIX +
|
||||
info.getName(), SLSConfiguration.NM_RESOURCE_DEFAULT);
|
||||
}
|
||||
|
||||
resource.setResourceValue(info.getName(), value);
|
||||
}
|
||||
|
||||
return resource;
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -261,10 +287,6 @@ public class SLSRunner extends Configured implements Tool {
|
|||
|
||||
private void startNM() throws YarnException, IOException {
|
||||
// nm configuration
|
||||
nmMemoryMB = getConf().getInt(SLSConfiguration.NM_MEMORY_MB,
|
||||
SLSConfiguration.NM_MEMORY_MB_DEFAULT);
|
||||
nmVCores = getConf().getInt(SLSConfiguration.NM_VCORES,
|
||||
SLSConfiguration.NM_VCORES_DEFAULT);
|
||||
int heartbeatInterval =
|
||||
getConf().getInt(SLSConfiguration.NM_HEARTBEAT_INTERVAL_MS,
|
||||
SLSConfiguration.NM_HEARTBEAT_INTERVAL_MS_DEFAULT);
|
||||
|
@ -304,7 +326,7 @@ public class SLSRunner extends Configured implements Tool {
|
|||
for (String hostName : nodeSet) {
|
||||
// we randomize the heartbeat start time from zero to 1 interval
|
||||
NMSimulator nm = new NMSimulator();
|
||||
nm.init(hostName, nmMemoryMB, nmVCores, random.nextInt(heartbeatInterval),
|
||||
nm.init(hostName, nodeManagerResource, random.nextInt(heartbeatInterval),
|
||||
heartbeatInterval, rm);
|
||||
nmMap.put(nm.getNode().getNodeID(), nm);
|
||||
runner.schedule(nm);
|
||||
|
@ -460,18 +482,7 @@ public class SLSRunner extends Configured implements Tool {
|
|||
+ " to 0!");
|
||||
}
|
||||
|
||||
Resource res = getDefaultContainerResource();
|
||||
if (jsonTask.containsKey(SLSConfiguration.TASK_MEMORY)) {
|
||||
int containerMemory = Integer.parseInt(
|
||||
jsonTask.get(SLSConfiguration.TASK_MEMORY).toString());
|
||||
res.setMemorySize(containerMemory);
|
||||
}
|
||||
|
||||
if (jsonTask.containsKey(SLSConfiguration.CONTAINER_VCORES)) {
|
||||
int containerVCores = Integer.parseInt(
|
||||
jsonTask.get(SLSConfiguration.CONTAINER_VCORES).toString());
|
||||
res.setVirtualCores(containerVCores);
|
||||
}
|
||||
Resource res = getResourceForContainer(jsonTask);
|
||||
|
||||
int priority = DEFAULT_MAPPER_PRIORITY;
|
||||
if (jsonTask.containsKey(SLSConfiguration.TASK_PRIORITY)) {
|
||||
|
@ -500,6 +511,21 @@ public class SLSRunner extends Configured implements Tool {
|
|||
return containers;
|
||||
}
|
||||
|
||||
private Resource getResourceForContainer(Map jsonTask) {
|
||||
Resource res = getDefaultContainerResource();
|
||||
ResourceInformation[] infors = ResourceUtils.getResourceTypesArray();
|
||||
for (ResourceInformation info : infors) {
|
||||
if (jsonTask.containsKey(SLSConfiguration.TASK_PREFIX + info.getName())) {
|
||||
long value = Long.parseLong(
|
||||
jsonTask.get(SLSConfiguration.TASK_PREFIX + info.getName())
|
||||
.toString());
|
||||
res.setResourceValue(info.getName(), value);
|
||||
}
|
||||
}
|
||||
|
||||
return res;
|
||||
}
|
||||
|
||||
/**
|
||||
* Parse workload from a rumen trace file.
|
||||
*/
|
||||
|
@ -717,15 +743,15 @@ public class SLSRunner extends Configured implements Tool {
|
|||
return amContainerResource;
|
||||
}
|
||||
|
||||
if (jsonJob.containsKey(SLSConfiguration.AM_MEMORY)) {
|
||||
amContainerResource.setMemorySize(
|
||||
Long.parseLong(jsonJob.get(SLSConfiguration.AM_MEMORY).toString()));
|
||||
ResourceInformation[] infors = ResourceUtils.getResourceTypesArray();
|
||||
for (ResourceInformation info : infors) {
|
||||
String key = SLSConfiguration.JOB_AM_PREFIX + info.getName();
|
||||
if (jsonJob.containsKey(key)) {
|
||||
long value = Long.parseLong(jsonJob.get(key).toString());
|
||||
amContainerResource.setResourceValue(info.getName(), value);
|
||||
}
|
||||
}
|
||||
|
||||
if (jsonJob.containsKey(SLSConfiguration.AM_VCORES)) {
|
||||
amContainerResource.setVirtualCores(
|
||||
Integer.parseInt(jsonJob.get(SLSConfiguration.AM_VCORES).toString()));
|
||||
}
|
||||
return amContainerResource;
|
||||
}
|
||||
|
||||
|
@ -777,8 +803,8 @@ public class SLSRunner extends Configured implements Tool {
|
|||
// node
|
||||
LOG.info("------------------------------------");
|
||||
LOG.info("# nodes = {}, # racks = {}, capacity " +
|
||||
"of each node {} MB memory and {} vcores.",
|
||||
numNMs, numRacks, nmMemoryMB, nmVCores);
|
||||
"of each node {}.",
|
||||
numNMs, numRacks, nodeManagerResource);
|
||||
LOG.info("------------------------------------");
|
||||
// job
|
||||
LOG.info("# applications = {}, # total " +
|
||||
|
@ -804,8 +830,10 @@ public class SLSRunner extends Configured implements Tool {
|
|||
// package these information in the simulateInfoMap used by other places
|
||||
simulateInfoMap.put("Number of racks", numRacks);
|
||||
simulateInfoMap.put("Number of nodes", numNMs);
|
||||
simulateInfoMap.put("Node memory (MB)", nmMemoryMB);
|
||||
simulateInfoMap.put("Node VCores", nmVCores);
|
||||
simulateInfoMap.put("Node memory (MB)",
|
||||
nodeManagerResource.getResourceValue(ResourceInformation.MEMORY_URI));
|
||||
simulateInfoMap.put("Node VCores",
|
||||
nodeManagerResource.getResourceValue(ResourceInformation.VCORES_URI));
|
||||
simulateInfoMap.put("Number of applications", numAMs);
|
||||
simulateInfoMap.put("Number of tasks", numTasks);
|
||||
simulateInfoMap.put("Average tasks per applicaion",
|
||||
|
|
|
@ -54,6 +54,7 @@ public class SLSConfiguration {
|
|||
public static final int NM_MEMORY_MB_DEFAULT = 10240;
|
||||
public static final String NM_VCORES = NM_PREFIX + "vcores";
|
||||
public static final int NM_VCORES_DEFAULT = 10;
|
||||
public static final int NM_RESOURCE_DEFAULT = 0;
|
||||
public static final String NM_HEARTBEAT_INTERVAL_MS = NM_PREFIX
|
||||
+ "heartbeat.interval.ms";
|
||||
public static final int NM_HEARTBEAT_INTERVAL_MS_DEFAULT = 1000;
|
||||
|
@ -65,12 +66,10 @@ public class SLSConfiguration {
|
|||
public static final String AM_TYPE = AM_PREFIX + "type";
|
||||
public static final String AM_TYPE_PREFIX = AM_TYPE + ".";
|
||||
|
||||
public static final String AM_MEMORY = AM_PREFIX + "memory";
|
||||
public static final String AM_CONTAINER_MEMORY = AM_PREFIX +
|
||||
"container.memory";
|
||||
public static final int AM_CONTAINER_MEMORY_DEFAULT = 1024;
|
||||
|
||||
public static final String AM_VCORES = AM_PREFIX + "vcores";
|
||||
public static final String AM_CONTAINER_VCORES = AM_PREFIX +
|
||||
"container.vcores";
|
||||
public static final int AM_CONTAINER_VCORES_DEFAULT = 1;
|
||||
|
@ -104,8 +103,10 @@ public class SLSConfiguration {
|
|||
public static final String JOB_USER = JOB_PREFIX + "user";
|
||||
public static final String JOB_COUNT = JOB_PREFIX + "count";
|
||||
public static final String JOB_TASKS = JOB_PREFIX + "tasks";
|
||||
public static final String JOB_AM_PREFIX = "am.";
|
||||
|
||||
// task
|
||||
public static final String TASK_PREFIX = "container.";
|
||||
public static final String COUNT = "count";
|
||||
public static final String TASK_CONTAINER = "container.";
|
||||
public static final String TASK_HOST = TASK_CONTAINER + "host";
|
||||
|
@ -115,6 +116,5 @@ public class SLSConfiguration {
|
|||
public static final String TASK_DURATION_MS = TASK_CONTAINER + DURATION_MS;
|
||||
public static final String TASK_PRIORITY = TASK_CONTAINER + "priority";
|
||||
public static final String TASK_TYPE = TASK_CONTAINER + "type";
|
||||
public static final String TASK_MEMORY = TASK_CONTAINER + "memory";
|
||||
|
||||
}
|
||||
|
|
|
@ -35,24 +35,23 @@ import org.apache.hadoop.yarn.api.records.ContainerExitStatus;
|
|||
import org.apache.hadoop.yarn.api.records.ContainerId;
|
||||
import org.apache.hadoop.yarn.api.records.ContainerState;
|
||||
import org.apache.hadoop.yarn.api.records.ContainerStatus;
|
||||
import org.apache.hadoop.yarn.api.records.Resource;
|
||||
import org.apache.hadoop.yarn.exceptions.YarnException;
|
||||
import org.apache.hadoop.yarn.server.api.protocolrecords.NodeHeartbeatRequest;
|
||||
import org.apache.hadoop.yarn.server.api.protocolrecords.NodeHeartbeatResponse;
|
||||
import org.apache.hadoop.yarn.server.api.protocolrecords
|
||||
.RegisterNodeManagerRequest;
|
||||
import org.apache.hadoop.yarn.server.api.protocolrecords
|
||||
.RegisterNodeManagerResponse;
|
||||
import org.apache.hadoop.yarn.server.api.protocolrecords.RegisterNodeManagerRequest;
|
||||
import org.apache.hadoop.yarn.server.api.protocolrecords.RegisterNodeManagerResponse;
|
||||
import org.apache.hadoop.yarn.server.api.records.MasterKey;
|
||||
import org.apache.hadoop.yarn.server.api.records.NodeAction;
|
||||
import org.apache.hadoop.yarn.server.api.records.NodeHealthStatus;
|
||||
import org.apache.hadoop.yarn.server.api.records.NodeStatus;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
|
||||
import org.apache.hadoop.yarn.server.utils.BuilderUtils;
|
||||
import org.apache.hadoop.yarn.util.Records;
|
||||
import org.apache.hadoop.yarn.sls.scheduler.ContainerSimulator;
|
||||
import org.apache.hadoop.yarn.sls.scheduler.TaskRunner;
|
||||
import org.apache.hadoop.yarn.sls.utils.SLSUtils;
|
||||
import org.apache.hadoop.yarn.util.resource.Resources;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
|
@ -75,15 +74,15 @@ public class NMSimulator extends TaskRunner.Task {
|
|||
private int RESPONSE_ID = 1;
|
||||
private final static Logger LOG = LoggerFactory.getLogger(NMSimulator.class);
|
||||
|
||||
public void init(String nodeIdStr, int memory, int cores,
|
||||
public void init(String nodeIdStr, Resource nodeResource,
|
||||
int dispatchTime, int heartBeatInterval, ResourceManager rm)
|
||||
throws IOException, YarnException {
|
||||
super.init(dispatchTime, dispatchTime + 1000000L * heartBeatInterval,
|
||||
heartBeatInterval);
|
||||
// create resource
|
||||
String rackHostName[] = SLSUtils.getRackHostName(nodeIdStr);
|
||||
this.node = NodeInfo.newNodeInfo(rackHostName[0], rackHostName[1],
|
||||
BuilderUtils.newResource(memory, cores));
|
||||
this.node = NodeInfo.newNodeInfo(rackHostName[0], rackHostName[1],
|
||||
Resources.clone(nodeResource));
|
||||
this.rm = rm;
|
||||
// init data structures
|
||||
completedContainerList =
|
||||
|
|
|
@ -315,6 +315,54 @@ The SYNTH mode of SLS is very convenient to generate very large loads without th
|
|||
files. This allows to easily explore wide range of use cases (e.g., imagine simulating 100k jobs, and in different
|
||||
runs simply tune the average number of mappers, or average task duration), in an efficient and compact way.
|
||||
|
||||
Resource Type in SLS
|
||||
------------------------
|
||||
This section talks about how to use resource type in SLS.
|
||||
|
||||
## Configure Resource Manager
|
||||
This is the same to how to configure resource type for a real cluster. Configure
|
||||
item `yarn.resource-types` in yarn-site.xml as the following example does.
|
||||
|
||||
<property>
|
||||
<name>yarn.resource-types</name>
|
||||
<value>resource-type1, resource-type2</value>
|
||||
</property>
|
||||
|
||||
## Configure Node Manager
|
||||
Specify the size of resource in each node by adding relevant items into
|
||||
sls-runner.xml like the following example does. The values apply for every node
|
||||
in SLS. The default values for resources other than memory and vcores are 0.
|
||||
|
||||
<property>
|
||||
<name>yarn.sls.nm.resource-type1</name>
|
||||
<value>10</value>
|
||||
</property>
|
||||
<property>
|
||||
<name>yarn.sls.nm.resource-type2</name>
|
||||
<value>10</value>
|
||||
</property>
|
||||
|
||||
## Specify Resource in SLS JSON input
|
||||
Resource Type is supported in SLS JSON input format, but not in other two
|
||||
formats(SYNTH and RUMEN). To make it work in SLS JSON input format, you can
|
||||
specify resource sizes for both task containers and the AM container. Here is an
|
||||
example.
|
||||
|
||||
{
|
||||
"job.start.ms" : 0,
|
||||
"am.memory-mb": 2048,
|
||||
"am.vcores": 2,
|
||||
"am.resource-type1": 2,
|
||||
"am.resource-type2": 2,
|
||||
"job.tasks" : [ {
|
||||
"container.duration.ms": 5000
|
||||
"container.memory-mb": 1024,
|
||||
"container.vcores": 1,
|
||||
"container.resource-type1": 1,
|
||||
"container.resource-type2": 1
|
||||
}
|
||||
}
|
||||
|
||||
Appendix
|
||||
--------
|
||||
|
||||
|
|
|
@ -75,7 +75,8 @@ public class TestNMSimulator {
|
|||
public void testNMSimulator() throws Exception {
|
||||
// Register one node
|
||||
NMSimulator node1 = new NMSimulator();
|
||||
node1.init("/rack1/node1", GB * 10, 10, 0, 1000, rm);
|
||||
node1.init("/rack1/node1", Resources.createResource(GB * 10, 10), 0, 1000,
|
||||
rm);
|
||||
node1.middleStep();
|
||||
|
||||
int numClusterNodes = rm.getResourceScheduler().getNumClusterNodes();
|
||||
|
|
Loading…
Reference in New Issue