YARN-7929. Support to set container execution type in SLS. (Jiandan Yang via Weiwei Yang)

This commit is contained in:
Weiwei Yang 2018-02-28 17:57:28 +08:00
parent a9c14b1119
commit 7af4f34de5
15 changed files with 141 additions and 38 deletions

View File

@ -55,6 +55,7 @@ import org.apache.hadoop.tools.rumen.LoggedTaskAttempt;
import org.apache.hadoop.util.ReflectionUtils;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
import org.apache.hadoop.yarn.api.records.ExecutionType;
import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.hadoop.yarn.api.records.NodeState;
import org.apache.hadoop.yarn.api.records.ReservationId;
@ -284,9 +285,12 @@ public class SLSRunner extends Configured implements Tool {
private void startNM() throws YarnException, IOException {
// nm configuration
int heartbeatInterval =
getConf().getInt(SLSConfiguration.NM_HEARTBEAT_INTERVAL_MS,
SLSConfiguration.NM_HEARTBEAT_INTERVAL_MS_DEFAULT);
int heartbeatInterval = getConf().getInt(
SLSConfiguration.NM_HEARTBEAT_INTERVAL_MS,
SLSConfiguration.NM_HEARTBEAT_INTERVAL_MS_DEFAULT);
float resourceUtilizationRatio = getConf().getFloat(
SLSConfiguration.NM_RESOURCE_UTILIZATION_RATIO,
SLSConfiguration.NM_RESOURCE_UTILIZATION_RATIO_DEFAULT);
// nm information (fetch from topology file, or from sls/rumen json file)
Set<String> nodeSet = new HashSet<String>();
if (nodeFile.isEmpty()) {
@ -324,7 +328,7 @@ public class SLSRunner extends Configured implements Tool {
// we randomize the heartbeat start time from zero to 1 interval
NMSimulator nm = new NMSimulator();
nm.init(hostName, nodeManagerResource, random.nextInt(heartbeatInterval),
heartbeatInterval, rm);
heartbeatInterval, rm, resourceUtilizationRatio);
nmMap.put(nm.getNode().getNodeID(), nm);
runner.schedule(nm);
rackSet.add(nm.getNode().getRackName());
@ -499,9 +503,15 @@ public class SLSRunner extends Configured implements Tool {
}
count = Math.max(count, 1);
ExecutionType executionType = ExecutionType.GUARANTEED;
if (jsonTask.containsKey(SLSConfiguration.TASK_EXECUTION_TYPE)) {
executionType = ExecutionType.valueOf(
jsonTask.get(SLSConfiguration.TASK_EXECUTION_TYPE).toString());
}
for (int i = 0; i < count; i++) {
containers.add(
new ContainerSimulator(res, duration, hostname, priority, type));
new ContainerSimulator(res, duration, hostname, priority, type,
executionType));
}
}
@ -670,7 +680,8 @@ public class SLSRunner extends Configured implements Tool {
.newInstance((int) task.getMemory(), (int) task.getVcores());
containerList.add(
new ContainerSimulator(containerResource, containerLifeTime,
hostname, task.getPriority(), task.getType()));
hostname, task.getPriority(), task.getType(),
task.getExecutionType()));
}

View File

@ -47,6 +47,8 @@ import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
import org.apache.hadoop.yarn.api.records.Container;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
import org.apache.hadoop.yarn.api.records.ExecutionType;
import org.apache.hadoop.yarn.api.records.ExecutionTypeRequest;
import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
import org.apache.hadoop.yarn.api.records.Priority;
import org.apache.hadoop.yarn.api.records.ReservationId;
@ -259,13 +261,16 @@ public abstract class AMSimulator extends TaskRunner.Task {
}
}
protected ResourceRequest createResourceRequest(
Resource resource, String host, int priority, int numContainers) {
protected ResourceRequest createResourceRequest(Resource resource,
ExecutionType executionType, String host, int priority, int
numContainers) {
ResourceRequest request = recordFactory
.newRecordInstance(ResourceRequest.class);
request.setCapability(resource);
request.setResourceName(host);
request.setNumContainers(numContainers);
request.setExecutionTypeRequest(
ExecutionTypeRequest.newInstance(executionType));
Priority prio = recordFactory.newRecordInstance(Priority.class);
prio.setPriority(priority);
request.setPriority(prio);
@ -400,8 +405,8 @@ public abstract class AMSimulator extends TaskRunner.Task {
rackLocalRequestMap.get(rackname).setNumContainers(
rackLocalRequestMap.get(rackname).getNumContainers() + 1);
} else {
ResourceRequest request =
createResourceRequest(cs.getResource(), rackname, priority, 1);
ResourceRequest request = createResourceRequest(cs.getResource(),
cs.getExecutionType(), rackname, priority, 1);
rackLocalRequestMap.put(rackname, request);
}
// check node local
@ -410,15 +415,15 @@ public abstract class AMSimulator extends TaskRunner.Task {
nodeLocalRequestMap.get(hostname).setNumContainers(
nodeLocalRequestMap.get(hostname).getNumContainers() + 1);
} else {
ResourceRequest request =
createResourceRequest(cs.getResource(), hostname, priority, 1);
ResourceRequest request = createResourceRequest(cs.getResource(),
cs.getExecutionType(), hostname, priority, 1);
nodeLocalRequestMap.put(hostname, request);
}
}
// any
if (anyRequest == null) {
anyRequest = createResourceRequest(
cs.getResource(), ResourceRequest.ANY, priority, 1);
anyRequest = createResourceRequest(cs.getResource(),
cs.getExecutionType(), ResourceRequest.ANY, priority, 1);
} else {
anyRequest.setNumContainers(anyRequest.getNumContainers() + 1);
}

View File

@ -62,6 +62,8 @@ public class SLSConfiguration {
public static final String AM_PREFIX = PREFIX + "am.";
public static final String AM_HEARTBEAT_INTERVAL_MS = AM_PREFIX
+ "heartbeat.interval.ms";
public static final String NM_RESOURCE_UTILIZATION_RATIO = NM_PREFIX
+ "resource.utilization.ratio";
public static final int AM_HEARTBEAT_INTERVAL_MS_DEFAULT = 1000;
public static final String AM_TYPE = AM_PREFIX + "type";
public static final String AM_TYPE_PREFIX = AM_TYPE + ".";
@ -74,6 +76,8 @@ public class SLSConfiguration {
"container.vcores";
public static final int AM_CONTAINER_VCORES_DEFAULT = 1;
public static final float NM_RESOURCE_UTILIZATION_RATIO_DEFAULT = -1F;
// container
public static final String CONTAINER_PREFIX = PREFIX + "container.";
public static final String CONTAINER_MEMORY_MB = CONTAINER_PREFIX
@ -116,5 +120,7 @@ public class SLSConfiguration {
public static final String TASK_DURATION_MS = TASK_CONTAINER + DURATION_MS;
public static final String TASK_PRIORITY = TASK_CONTAINER + "priority";
public static final String TASK_TYPE = TASK_CONTAINER + "type";
public static final String TASK_EXECUTION_TYPE = TASK_CONTAINER
+ "execution.type";
}

View File

@ -36,6 +36,7 @@ import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.ContainerState;
import org.apache.hadoop.yarn.api.records.ContainerStatus;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.api.records.ResourceUtilization;
import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.server.api.protocolrecords.NodeHeartbeatRequest;
import org.apache.hadoop.yarn.server.api.protocolrecords.NodeHeartbeatResponse;
@ -72,18 +73,20 @@ public class NMSimulator extends TaskRunner.Task {
private ResourceManager rm;
// heart beat response id
private int responseId = 0;
private float resourceUtilizationRatio;
private final static Logger LOG = LoggerFactory.getLogger(NMSimulator.class);
public void init(String nodeIdStr, Resource nodeResource,
int dispatchTime, int heartBeatInterval, ResourceManager rm)
throws IOException, YarnException {
public void init(String nodeIdStr, Resource nodeResource, int dispatchTime,
int heartBeatInterval, ResourceManager pRm,
float pResourceUtilizationRatio)
throws IOException, YarnException {
super.init(dispatchTime, dispatchTime + 1000000L * heartBeatInterval,
heartBeatInterval);
heartBeatInterval);
// create resource
String rackHostName[] = SLSUtils.getRackHostName(nodeIdStr);
this.node = NodeInfo.newNodeInfo(rackHostName[0], rackHostName[1],
Resources.clone(nodeResource));
this.rm = rm;
this.rm = pRm;
// init data structures
completedContainerList =
Collections.synchronizedList(new ArrayList<ContainerId>());
@ -100,9 +103,10 @@ public class NMSimulator extends TaskRunner.Task {
req.setNodeId(node.getNodeID());
req.setResource(node.getTotalCapability());
req.setHttpPort(80);
RegisterNodeManagerResponse response = rm.getResourceTrackerService()
RegisterNodeManagerResponse response = this.rm.getResourceTrackerService()
.registerNodeManager(req);
masterKey = response.getNMTokenMasterKey();
this.resourceUtilizationRatio = pResourceUtilizationRatio;
}
@Override
@ -133,6 +137,18 @@ public class NMSimulator extends TaskRunner.Task {
ns.setKeepAliveApplications(new ArrayList<ApplicationId>());
ns.setResponseId(responseId++);
ns.setNodeHealthStatus(NodeHealthStatus.newInstance(true, "", 0));
//set node & containers utilization
if (resourceUtilizationRatio > 0 && resourceUtilizationRatio <=1) {
int pMemUsed = Math.round(node.getTotalCapability().getMemorySize()
* resourceUtilizationRatio);
float cpuUsed = node.getTotalCapability().getVirtualCores()
* resourceUtilizationRatio;
ResourceUtilization resourceUtilization = ResourceUtilization.newInstance(
pMemUsed, pMemUsed, cpuUsed);
ns.setContainersUtilization(resourceUtilization);
ns.setNodeUtilization(resourceUtilization);
}
beatRequest.setNodeStatus(ns);
NodeHeartbeatResponse beatResponse =
rm.getResourceTrackerService().nodeHeartbeat(beatRequest);

View File

@ -24,6 +24,7 @@ import java.util.concurrent.TimeUnit;
import org.apache.hadoop.classification.InterfaceAudience.Private;
import org.apache.hadoop.classification.InterfaceStability.Unstable;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.ExecutionType;
import org.apache.hadoop.yarn.api.records.Resource;
@Private
@ -43,21 +44,33 @@ public class ContainerSimulator implements Delayed {
private int priority;
// type
private String type;
// execution type
private ExecutionType executionType = ExecutionType.GUARANTEED;
/**
* invoked when AM schedules containers to allocate
* invoked when AM schedules containers to allocate.
*/
public ContainerSimulator(Resource resource, long lifeTime,
String hostname, int priority, String type) {
this(resource, lifeTime, hostname, priority, type,
ExecutionType.GUARANTEED);
}
/**
* invoked when AM schedules containers to allocate.
*/
public ContainerSimulator(Resource resource, long lifeTime,
String hostname, int priority, String type, ExecutionType executionType) {
this.resource = resource;
this.lifeTime = lifeTime;
this.hostname = hostname;
this.priority = priority;
this.type = type;
this.executionType = executionType;
}
/**
* invoke when NM schedules containers to run
* invoke when NM schedules containers to run.
*/
public ContainerSimulator(ContainerId id, Resource resource, long endTime,
long lifeTime) {
@ -114,4 +127,8 @@ public class ContainerSimulator implements Delayed {
public void setPriority(int p) {
priority = p;
}
public ExecutionType getExecutionType() {
return executionType;
}
}

View File

@ -327,7 +327,9 @@ public class SLSCapacityScheduler extends CapacityScheduler implements
@Override
public void serviceStop() throws Exception {
try {
schedulerMetrics.tearDown();
if (metricsON) {
schedulerMetrics.tearDown();
}
} catch (Exception e) {
e.printStackTrace();
}

View File

@ -311,7 +311,9 @@ public class SLSFairScheduler extends FairScheduler
@Override
public void serviceStop() throws Exception {
try {
schedulerMetrics.tearDown();
if (metricsON) {
schedulerMetrics.tearDown();
}
} catch (Exception e) {
e.printStackTrace();
}

View File

@ -32,6 +32,7 @@ import org.apache.hadoop.tools.rumen.ReduceTaskAttemptInfo;
import org.apache.hadoop.tools.rumen.TaskAttemptInfo;
import org.apache.hadoop.tools.rumen.TaskInfo;
import org.apache.hadoop.tools.rumen.Pre21JobHistoryConstants.Values;
import org.apache.hadoop.yarn.api.records.ExecutionType;
import org.apache.hadoop.yarn.sls.appmaster.MRAMSimulator;
import java.util.ArrayList;
@ -92,14 +93,16 @@ public class SynthJob implements JobStory {
private long maxMemory;
private long maxVcores;
private int priority;
private ExecutionType executionType;
private SynthTask(String type, long time, long maxMemory, long maxVcores,
int priority){
int priority, ExecutionType executionType){
this.type = type;
this.time = time;
this.maxMemory = maxMemory;
this.maxVcores = maxVcores;
this.priority = priority;
this.executionType = executionType;
}
public String getType(){
@ -122,11 +125,15 @@ public class SynthJob implements JobStory {
return priority;
}
public ExecutionType getExecutionType() {
return executionType;
}
@Override
public String toString(){
return String.format("[task]\ttype: %1$-10s\ttime: %2$3s\tmemory: "
+ "%3$4s\tvcores: %4$2s%n", getType(), getTime(), getMemory(),
getVcores());
+ "%3$4s\tvcores: %4$2s\texecution_type: %5$-10s%n", getType(),
getTime(), getMemory(), getVcores(), getExecutionType().toString());
}
}
@ -181,6 +188,9 @@ public class SynthJob implements JobStory {
long vcores = task.max_vcores.getLong();
vcores = vcores < MIN_VCORES ? MIN_VCORES : vcores;
int priority = task.priority;
ExecutionType executionType = task.executionType == null
? ExecutionType.GUARANTEED
: ExecutionType.valueOf(task.executionType);
// Save task information by type
taskByType.put(taskType, new ArrayList<>());
@ -192,7 +202,7 @@ public class SynthJob implements JobStory {
long time = task.time.getLong();
totalSlotTime += time;
SynthTask t = new SynthTask(taskType, time, memory, vcores,
priority);
priority, executionType);
tasks.add(t);
taskByType.get(taskType).add(t);
}

View File

@ -27,6 +27,7 @@ import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.tools.rumen.JobStory;
import org.apache.hadoop.tools.rumen.JobStoryProducer;
import org.apache.hadoop.yarn.api.records.ExecutionType;
import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
import org.apache.hadoop.yarn.sls.appmaster.MRAMSimulator;
import org.codehaus.jackson.annotate.JsonCreator;
@ -199,6 +200,7 @@ public class SynthTraceJobProducer implements JobStoryProducer {
map.max_vcores = new Sample((double) jobDef.map_max_vcores_avg,
jobDef.map_max_vcores_stddev);
map.priority = DEFAULT_MAPPER_PRIORITY;
map.executionType = jobDef.map_execution_type;
jobDef.tasks.add(map);
TaskDefinition reduce = new TaskDefinition();
@ -210,6 +212,7 @@ public class SynthTraceJobProducer implements JobStoryProducer {
reduce.max_vcores = new Sample((double) jobDef.reduce_max_vcores_avg,
jobDef.reduce_max_vcores_stddev);
reduce.priority = DEFAULT_REDUCER_PRIORITY;
reduce.executionType = jobDef.reduce_execution_type;
jobDef.tasks.add(reduce);
} catch (JsonMappingException e) {
@ -425,6 +428,12 @@ public class SynthTraceJobProducer implements JobStoryProducer {
@JsonProperty("reduce_max_vcores_stddev")
double reduce_max_vcores_stddev;
//container execution type
@JsonProperty("map_execution_type")
String map_execution_type = ExecutionType.GUARANTEED.name();
@JsonProperty("reduce_execution_type")
String reduce_execution_type = ExecutionType.GUARANTEED.name();
public void init(JDKRandomGenerator rand){
deadline_factor.init(rand);
duration.init(rand);
@ -464,12 +473,15 @@ public class SynthTraceJobProducer implements JobStoryProducer {
Sample max_vcores;
@JsonProperty("priority")
int priority;
@JsonProperty("execution_type")
String executionType = ExecutionType.GUARANTEED.name();
@Override
public String toString(){
return "\nTaskDefinition " + type
+ " Count[" + count + "] Time[" + time + "] Memory[" + max_memory
+ "] Vcores[" + max_vcores + "] Priority[" + priority + "]";
+ "] Vcores[" + max_vcores + "] Priority[" + priority
+ "] ExecutionType[" + executionType + "]";
}
}

View File

@ -18,6 +18,7 @@
package org.apache.hadoop.yarn.sls;
import org.apache.commons.math3.random.JDKRandomGenerator;
import org.apache.hadoop.yarn.api.records.ExecutionType;
import org.codehaus.jackson.map.JsonMappingException;
import org.codehaus.jackson.map.ObjectMapper;
import org.apache.hadoop.conf.Configuration;
@ -31,6 +32,7 @@ import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.util.Arrays;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
import static org.codehaus.jackson.JsonParser.Feature.INTERN_FIELD_NAMES;
@ -254,6 +256,7 @@ public class TestSynthJobGeneration {
assertTrue(t.getTime() > 0);
assertTrue(t.getMemory() > 0);
assertTrue(t.getVcores() > 0);
assertEquals(ExecutionType.GUARANTEED, t.getExecutionType());
}
}
}

View File

@ -17,6 +17,8 @@
*/
package org.apache.hadoop.yarn.sls.nodemanager;
import com.google.common.base.Supplier;
import org.apache.hadoop.test.GenericTestUtils;
import org.apache.hadoop.yarn.api.records.Container;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
@ -76,7 +78,7 @@ public class TestNMSimulator {
// Register one node
NMSimulator node1 = new NMSimulator();
node1.init("/rack1/node1", Resources.createResource(GB * 10, 10), 0, 1000,
rm);
rm, -1f);
node1.middleStep();
int numClusterNodes = rm.getResourceScheduler().getNumClusterNodes();
@ -89,6 +91,13 @@ public class TestNMSimulator {
numClusterNodes = rm.getResourceScheduler().getNumClusterNodes();
}
GenericTestUtils.waitFor(new Supplier<Boolean>() {
@Override public Boolean get() {
return rm.getResourceScheduler().getRootQueueMetrics()
.getAvailableMB() > 0;
}
}, 500, 10000);
Assert.assertEquals(1, rm.getResourceScheduler().getNumClusterNodes());
Assert.assertEquals(GB * 10,
rm.getResourceScheduler().getRootQueueMetrics().getAvailableMB());

View File

@ -11,21 +11,24 @@
"container.start.ms": 6664,
"container.end.ms": 23707,
"container.priority": 20,
"container.type": "map"
"container.type": "map",
"container.execution.type": "GUARANTEED"
},
{
"container.host": "/default-rack/node3",
"container.start.ms": 6665,
"container.end.ms": 21593,
"container.priority": 20,
"container.type": "map"
"container.type": "map",
"container.execution.type": "GUARANTEED"
},
{
"container.host": "/default-rack/node2",
"container.start.ms": 68770,
"container.end.ms": 86613,
"container.priority": 20,
"container.type": "map"
"container.type": "map",
"container.execution.type": "GUARANTEED"
}
]
}
@ -42,14 +45,16 @@
"container.start.ms": 111822,
"container.end.ms": 133985,
"container.priority": 20,
"container.type": "map"
"container.type": "map",
"container.execution.type": "GUARANTEED"
},
{
"container.host": "/default-rack/node2",
"container.start.ms": 111788,
"container.end.ms": 131377,
"container.priority": 20,
"container.type": "map"
"container.type": "map",
"container.execution.type": "GUARANTEED"
}
]
}

View File

@ -27,8 +27,10 @@
"rtime_stddev": 4,
"map_max_memory_avg": 1024,
"map_max_memory_stddev": 0.001,
"map_execution_type": "GUARANTEED",
"reduce_max_memory_avg": 2048,
"reduce_max_memory_stddev": 0.001,
"reduce_execution_type": "GUARANTEED",
"map_max_vcores_avg": 1,
"map_max_vcores_stddev": 0.001,
"reduce_max_vcores_avg": 2,

View File

@ -26,7 +26,8 @@
"count": { "val": 5, "std": 1},
"time": {"val": 10, "std": 2},
"max_memory": {"val": 1024},
"max_vcores": {"val": 1}
"max_vcores": {"val": 1},
"execution_type": "GUARANTEED"
},
{
"type": "reduce",
@ -34,7 +35,8 @@
"count": { "val": 5, "std": 1},
"time": {"val": 20, "std": 4},
"max_memory": {"val": 2048},
"max_vcores": {"val": 2}
"max_vcores": {"val": 2},
"execution_type": "GUARANTEED"
}
]
}

View File

@ -26,7 +26,8 @@
"count": { "val": 2},
"time": {"val": 60000},
"max_memory": {"val": 4096},
"max_vcores": {"val": 4}
"max_vcores": {"val": 4},
"execution_type": "GUARANTEED"
}
]
}