YARN-6423. Queue metrics doesn't work for Fair Scheduler in SLS (yufeigu via rkanter)

This commit is contained in:
Robert Kanter 2017-04-25 16:25:56 -07:00
parent e4321ec843
commit 475f933b41
5 changed files with 226 additions and 202 deletions

View File

@ -69,12 +69,15 @@ import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.FairScheduler;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fifo.FifoScheduler;
import org.apache.hadoop.yarn.server.utils.BuilderUtils;
import org.apache.hadoop.yarn.sls.appmaster.AMSimulator;
import org.apache.hadoop.yarn.sls.conf.SLSConfiguration;
import org.apache.hadoop.yarn.sls.nodemanager.NMSimulator;
import org.apache.hadoop.yarn.sls.resourcemanager.MockAMLauncher;
import org.apache.hadoop.yarn.sls.scheduler.*;
import org.apache.hadoop.yarn.sls.scheduler.SLSCapacityScheduler;
import org.apache.hadoop.yarn.sls.scheduler.TaskRunner;
import org.apache.hadoop.yarn.sls.scheduler.SLSFairScheduler;
import org.apache.hadoop.yarn.sls.scheduler.ContainerSimulator;
import org.apache.hadoop.yarn.sls.scheduler.SchedulerWrapper;
import org.apache.hadoop.yarn.sls.synthetic.SynthJob;
import org.apache.hadoop.yarn.sls.synthetic.SynthTraceJobProducer;
import org.apache.hadoop.yarn.sls.utils.SLSUtils;
@ -197,10 +200,6 @@ public class SLSRunner extends Configured implements Tool {
Configuration rmConf = new YarnConfiguration(getConf());
String schedulerClass = rmConf.get(YarnConfiguration.RM_SCHEDULER);
// For CapacityScheduler we use a sub-classing instead of wrapping
// to allow scheduler-specific invocations from monitors to work
// this can be used for other schedulers as well if we care to
// exercise/track behaviors that are not common to the scheduler api
if (Class.forName(schedulerClass) == CapacityScheduler.class) {
rmConf.set(YarnConfiguration.RM_SCHEDULER,
SLSCapacityScheduler.class.getName());
@ -300,226 +299,207 @@ public class SLSRunner extends Configured implements Tool {
@SuppressWarnings("unchecked")
private void startAM() throws YarnException, IOException {
// application/container configuration
int heartbeatInterval =
getConf().getInt(SLSConfiguration.AM_HEARTBEAT_INTERVAL_MS,
SLSConfiguration.AM_HEARTBEAT_INTERVAL_MS_DEFAULT);
int containerMemoryMB =
getConf().getInt(SLSConfiguration.CONTAINER_MEMORY_MB,
SLSConfiguration.CONTAINER_MEMORY_MB_DEFAULT);
int containerVCores = getConf().getInt(SLSConfiguration.CONTAINER_VCORES,
SLSConfiguration.CONTAINER_VCORES_DEFAULT);
Resource containerResource =
BuilderUtils.newResource(containerMemoryMB, containerVCores);
// application workload
switch (inputType) {
case SLS:
startAMFromSLSTraces(containerResource, heartbeatInterval);
for (String inputTrace : inputTraces) {
startAMFromSLSTrace(inputTrace);
}
break;
case RUMEN:
startAMFromRumenTraces(containerResource, heartbeatInterval);
long baselineTimeMS = 0;
for (String inputTrace : inputTraces) {
startAMFromRumenTrace(inputTrace, baselineTimeMS);
}
break;
case SYNTH:
startAMFromSynthGenerator(heartbeatInterval);
startAMFromSynthGenerator();
break;
default:
throw new YarnException("Input configuration not recognized, "
+ "trace type should be SLS, RUMEN, or SYNTH");
}
numAMs = amMap.size();
remainingApps = numAMs;
}
/**
* parse workload information from sls trace files
* Parse workload from a SLS trace file.
*/
@SuppressWarnings("unchecked")
private void startAMFromSLSTraces(Resource containerResource,
int heartbeatInterval) throws IOException {
// parse from sls traces
private void startAMFromSLSTrace(String inputTrace) throws IOException {
JsonFactory jsonF = new JsonFactory();
ObjectMapper mapper = new ObjectMapper();
for (String inputTrace : inputTraces) {
Reader input =
new InputStreamReader(new FileInputStream(inputTrace), "UTF-8");
try {
Iterator<Map> i =
mapper.readValues(jsonF.createParser(input), Map.class);
while (i.hasNext()) {
Map jsonJob = i.next();
// load job information
long jobStartTime =
Long.parseLong(jsonJob.get("job.start.ms").toString());
long jobFinishTime =
Long.parseLong(jsonJob.get("job.end.ms").toString());
try (Reader input = new InputStreamReader(
new FileInputStream(inputTrace), "UTF-8")) {
Iterator<Map> jobIter = mapper.readValues(
jsonF.createParser(input), Map.class);
String user = (String) jsonJob.get("job.user");
if (user == null) {
user = "default";
}
String queue = jsonJob.get("job.queue.name").toString();
String oldAppId = jsonJob.get("job.id").toString();
boolean isTracked = trackedApps.contains(oldAppId);
int queueSize =
queueAppNumMap.containsKey(queue) ? queueAppNumMap.get(queue) : 0;
queueSize++;
queueAppNumMap.put(queue, queueSize);
// tasks
List tasks = (List) jsonJob.get("job.tasks");
if (tasks == null || tasks.size() == 0) {
continue;
}
List<ContainerSimulator> containerList =
new ArrayList<ContainerSimulator>();
for (Object o : tasks) {
Map jsonTask = (Map) o;
String hostname = jsonTask.get("container.host").toString();
long taskStart =
Long.parseLong(jsonTask.get("container.start.ms").toString());
long taskFinish =
Long.parseLong(jsonTask.get("container.end.ms").toString());
long lifeTime = taskFinish - taskStart;
// Set memory and vcores from job trace file
Resource res = Resources.clone(containerResource);
if (jsonTask.containsKey("container.memory")) {
int containerMemory =
Integer.parseInt(jsonTask.get("container.memory").toString());
res.setMemorySize(containerMemory);
}
if (jsonTask.containsKey("container.vcores")) {
int containerVCores =
Integer.parseInt(jsonTask.get("container.vcores").toString());
res.setVirtualCores(containerVCores);
}
int priority =
Integer.parseInt(jsonTask.get("container.priority").toString());
String type = jsonTask.get("container.type").toString();
containerList.add(new ContainerSimulator(res, lifeTime, hostname,
priority, type));
}
// create a new AM
String amType = jsonJob.get("am.type").toString();
AMSimulator amSim = (AMSimulator) ReflectionUtils
.newInstance(amClassMap.get(amType), new Configuration());
if (amSim != null) {
amSim.init(AM_ID++, heartbeatInterval, containerList, rm, this,
jobStartTime, jobFinishTime, user, queue, isTracked, oldAppId,
null, runner.getStartTimeMS());
runner.schedule(amSim);
maxRuntime = Math.max(maxRuntime, jobFinishTime);
numTasks += containerList.size();
amMap.put(oldAppId, amSim);
}
while (jobIter.hasNext()) {
try {
createAMForJob(jobIter.next());
} catch (Exception e) {
LOG.error("Failed to create an AM: " + e.getMessage());
}
} finally {
input.close();
}
}
}
private void createAMForJob(Map jsonJob) throws YarnException {
long jobStartTime = Long.parseLong(jsonJob.get("job.start.ms").toString());
long jobFinishTime = Long.parseLong(jsonJob.get("job.end.ms").toString());
String user = (String) jsonJob.get("job.user");
if (user == null) {
user = "default";
}
String queue = jsonJob.get("job.queue.name").toString();
increaseQueueAppNum(queue);
String oldAppId = jsonJob.get("job.id").toString();
// tasks
List tasks = (List) jsonJob.get("job.tasks");
if (tasks == null || tasks.size() == 0) {
throw new YarnException("No task for the job!");
}
List<ContainerSimulator> containerList = new ArrayList<>();
for (Object o : tasks) {
Map jsonTask = (Map) o;
String hostname = jsonTask.get("container.host").toString();
long taskStart = Long.parseLong(jsonTask.get("container.start.ms")
.toString());
long taskFinish = Long.parseLong(jsonTask.get("container.end.ms")
.toString());
long lifeTime = taskFinish - taskStart;
// Set memory and vcores from job trace file
Resource res = getDefaultContainerResource();
if (jsonTask.containsKey("container.memory")) {
int containerMemory =
Integer.parseInt(jsonTask.get("container.memory").toString());
res.setMemorySize(containerMemory);
}
if (jsonTask.containsKey("container.vcores")) {
int containerVCores =
Integer.parseInt(jsonTask.get("container.vcores").toString());
res.setVirtualCores(containerVCores);
}
int priority = Integer.parseInt(jsonTask.get("container.priority")
.toString());
String type = jsonTask.get("container.type").toString();
containerList.add(
new ContainerSimulator(res, lifeTime, hostname, priority, type));
}
// create a new AM
String amType = jsonJob.get("am.type").toString();
runNewAM(amType, user, queue, oldAppId, jobStartTime, jobFinishTime,
containerList, null);
}
/**
* parse workload information from rumen trace files
* Parse workload from a rumen trace file.
*/
@SuppressWarnings("unchecked")
private void startAMFromRumenTraces(Resource containerResource,
int heartbeatInterval) throws IOException {
private void startAMFromRumenTrace(String inputTrace, long baselineTimeMS)
throws IOException {
Configuration conf = new Configuration();
conf.set("fs.defaultFS", "file:///");
long baselineTimeMS = 0;
for (String inputTrace : inputTraces) {
File fin = new File(inputTrace);
JobTraceReader reader =
new JobTraceReader(new Path(fin.getAbsolutePath()), conf);
try {
LoggedJob job = null;
while ((job = reader.getNext()) != null) {
// only support MapReduce currently
String jobType = "mapreduce";
String user =
job.getUser() == null ? "default" : job.getUser().getValue();
String jobQueue = job.getQueue().getValue();
String oldJobId = job.getJobID().toString();
long jobStartTimeMS = job.getSubmitTime();
long jobFinishTimeMS = job.getFinishTime();
if (baselineTimeMS == 0) {
baselineTimeMS = jobStartTimeMS;
}
jobStartTimeMS -= baselineTimeMS;
jobFinishTimeMS -= baselineTimeMS;
if (jobStartTimeMS < 0) {
LOG.warn("Warning: reset job " + oldJobId + " start time to 0.");
jobFinishTimeMS = jobFinishTimeMS - jobStartTimeMS;
jobStartTimeMS = 0;
}
File fin = new File(inputTrace);
boolean isTracked = trackedApps.contains(oldJobId);
int queueSize = queueAppNumMap.containsKey(jobQueue)
? queueAppNumMap.get(jobQueue) : 0;
queueSize++;
queueAppNumMap.put(jobQueue, queueSize);
try (JobTraceReader reader = new JobTraceReader(
new Path(fin.getAbsolutePath()), conf)) {
LoggedJob job = reader.getNext();
List<ContainerSimulator> containerList =
new ArrayList<ContainerSimulator>();
// map tasks
for (LoggedTask mapTask : job.getMapTasks()) {
if (mapTask.getAttempts().size() == 0) {
continue;
}
LoggedTaskAttempt taskAttempt =
mapTask.getAttempts().get(mapTask.getAttempts().size() - 1);
String hostname = taskAttempt.getHostName().getValue();
long containerLifeTime =
taskAttempt.getFinishTime() - taskAttempt.getStartTime();
containerList.add(new ContainerSimulator(containerResource,
containerLifeTime, hostname, 10, "map"));
}
// reduce tasks
for (LoggedTask reduceTask : job.getReduceTasks()) {
if (reduceTask.getAttempts().size() == 0) {
continue;
}
LoggedTaskAttempt taskAttempt = reduceTask.getAttempts()
.get(reduceTask.getAttempts().size() - 1);
String hostname = taskAttempt.getHostName().getValue();
long containerLifeTime =
taskAttempt.getFinishTime() - taskAttempt.getStartTime();
containerList.add(new ContainerSimulator(containerResource,
containerLifeTime, hostname, 20, "reduce"));
}
// create a new AM
AMSimulator amSim = (AMSimulator) ReflectionUtils
.newInstance(amClassMap.get(jobType), conf);
if (amSim != null) {
amSim.init(AM_ID++, heartbeatInterval, containerList, rm, this,
jobStartTimeMS, jobFinishTimeMS, user, jobQueue, isTracked,
oldJobId, null, runner.getStartTimeMS());
runner.schedule(amSim);
maxRuntime = Math.max(maxRuntime, jobFinishTimeMS);
numTasks += containerList.size();
amMap.put(oldJobId, amSim);
}
while (job != null) {
try {
createAMForJob(job, baselineTimeMS);
} catch (Exception e) {
LOG.error("Failed to create an AM: " + e.getMessage());
}
} finally {
reader.close();
job = reader.getNext();
}
}
}
private void createAMForJob(LoggedJob job, long baselineTimeMs)
throws YarnException {
String user = job.getUser() == null ? "default" :
job.getUser().getValue();
String jobQueue = job.getQueue().getValue();
String oldJobId = job.getJobID().toString();
long jobStartTimeMS = job.getSubmitTime();
long jobFinishTimeMS = job.getFinishTime();
if (baselineTimeMs == 0) {
baselineTimeMs = job.getSubmitTime();
}
jobStartTimeMS -= baselineTimeMs;
jobFinishTimeMS -= baselineTimeMs;
if (jobStartTimeMS < 0) {
LOG.warn("Warning: reset job " + oldJobId + " start time to 0.");
jobFinishTimeMS = jobFinishTimeMS - jobStartTimeMS;
jobStartTimeMS = 0;
}
increaseQueueAppNum(jobQueue);
List<ContainerSimulator> containerList = new ArrayList<>();
// mapper
for (LoggedTask mapTask : job.getMapTasks()) {
if (mapTask.getAttempts().size() == 0) {
throw new YarnException("Invalid map task, no attempt for a mapper!");
}
LoggedTaskAttempt taskAttempt =
mapTask.getAttempts().get(mapTask.getAttempts().size() - 1);
String hostname = taskAttempt.getHostName().getValue();
long containerLifeTime = taskAttempt.getFinishTime() -
taskAttempt.getStartTime();
containerList.add(
new ContainerSimulator(getDefaultContainerResource(),
containerLifeTime, hostname, 10, "map"));
}
// reducer
for (LoggedTask reduceTask : job.getReduceTasks()) {
if (reduceTask.getAttempts().size() == 0) {
throw new YarnException(
"Invalid reduce task, no attempt for a reducer!");
}
LoggedTaskAttempt taskAttempt =
reduceTask.getAttempts().get(reduceTask.getAttempts().size() - 1);
String hostname = taskAttempt.getHostName().getValue();
long containerLifeTime = taskAttempt.getFinishTime() -
taskAttempt.getStartTime();
containerList.add(
new ContainerSimulator(getDefaultContainerResource(),
containerLifeTime, hostname, 20, "reduce"));
}
// Only supports the default job type currently
runNewAM(SLSUtils.DEFAULT_JOB_TYPE, user, jobQueue, oldJobId,
jobStartTimeMS, jobFinishTimeMS, containerList, null);
}
private Resource getDefaultContainerResource() {
int containerMemory = getConf().getInt(SLSConfiguration.CONTAINER_MEMORY_MB,
SLSConfiguration.CONTAINER_MEMORY_MB_DEFAULT);
int containerVCores = getConf().getInt(SLSConfiguration.CONTAINER_VCORES,
SLSConfiguration.CONTAINER_VCORES_DEFAULT);
return Resources.createResource(containerMemory, containerVCores);
}
/**
* parse workload information from synth-generator trace files.
*/
@SuppressWarnings("unchecked")
private void startAMFromSynthGenerator(int heartbeatInterval)
throws IOException {
private void startAMFromSynthGenerator() throws YarnException, IOException {
Configuration localConf = new Configuration();
localConf.set("fs.defaultFS", "file:///");
long baselineTimeMS = 0;
@ -540,7 +520,6 @@ public class SLSRunner extends Configured implements Tool {
// creation
while ((job = (SynthJob) stjp.getNextJob()) != null) {
// only support MapReduce currently
String jobType = "mapreduce";
String user = job.getUser();
String jobQueue = job.getQueueName();
String oldJobId = job.getJobID().toString();
@ -560,11 +539,7 @@ public class SLSRunner extends Configured implements Tool {
jobStartTimeMS = 0;
}
boolean isTracked = trackedApps.contains(oldJobId);
int queueSize = queueAppNumMap.containsKey(jobQueue)
? queueAppNumMap.get(jobQueue) : 0;
queueSize++;
queueAppNumMap.put(jobQueue, queueSize);
increaseQueueAppNum(jobQueue);
List<ContainerSimulator> containerList =
new ArrayList<ContainerSimulator>();
@ -625,18 +600,9 @@ public class SLSRunner extends Configured implements Tool {
job.getQueueName());
}
// create a new AM
AMSimulator amSim = (AMSimulator) ReflectionUtils
.newInstance(amClassMap.get(jobType), localConf);
if (amSim != null) {
amSim.init(AM_ID++, heartbeatInterval, containerList, rm, this,
jobStartTimeMS, jobFinishTimeMS, user, jobQueue, isTracked,
oldJobId, rr, runner.getStartTimeMS());
runner.schedule(amSim);
maxRuntime = Math.max(maxRuntime, jobFinishTimeMS);
numTasks += containerList.size();
amMap.put(oldJobId, amSim);
}
runNewAM(SLSUtils.DEFAULT_JOB_TYPE, user, jobQueue, oldJobId,
jobStartTimeMS, jobFinishTimeMS, containerList, rr);
}
} finally {
stjp.close();
@ -644,6 +610,42 @@ public class SLSRunner extends Configured implements Tool {
}
private void increaseQueueAppNum(String queue) throws YarnException {
SchedulerWrapper wrapper = (SchedulerWrapper)rm.getResourceScheduler();
String queueName = wrapper.getRealQueueName(queue);
Integer appNum = queueAppNumMap.get(queueName);
if (appNum == null) {
appNum = 1;
} else {
appNum++;
}
queueAppNumMap.put(queueName, appNum);
}
private void runNewAM(String jobType, String user,
String jobQueue, String oldJobId, long jobStartTimeMS,
long jobFinishTimeMS, List<ContainerSimulator> containerList,
ReservationSubmissionRequest rr) {
AMSimulator amSim = (AMSimulator) ReflectionUtils.newInstance(
amClassMap.get(jobType), new Configuration());
if (amSim != null) {
int heartbeatInterval = getConf().getInt(
SLSConfiguration.AM_HEARTBEAT_INTERVAL_MS,
SLSConfiguration.AM_HEARTBEAT_INTERVAL_MS_DEFAULT);
boolean isTracked = trackedApps.contains(oldJobId);
amSim.init(AM_ID++, heartbeatInterval, containerList,
rm, this, jobStartTimeMS, jobFinishTimeMS, user, jobQueue,
isTracked, oldJobId, rr, runner.getStartTimeMS());
runner.schedule(amSim);
maxRuntime = Math.max(maxRuntime, jobFinishTimeMS);
numTasks += containerList.size();
amMap.put(oldJobId, amSim);
}
}
private void printSimulationInfo() {
if (printSimulation) {
// node

View File

@ -35,6 +35,7 @@ import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.ContainerStatus;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.api.records.ResourceRequest;
import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.UpdatedContainerInfo;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.Allocation;
@ -360,4 +361,12 @@ public class SLSCapacityScheduler extends CapacityScheduler implements
public Configuration getConf() {
return conf;
}
public String getRealQueueName(String queue) throws YarnException {
if (getQueue(queue) == null) {
throw new YarnException("Can't find the queue by the given name: " + queue
+ "! Please check if queue " + queue + " is in the allocation file.");
}
return getQueue(queue).getQueueName();
}
}

View File

@ -29,6 +29,7 @@ import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.ContainerStatus;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.api.records.ResourceRequest;
import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.UpdatedContainerInfo;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.Allocation;
@ -333,5 +334,13 @@ public class SLSFairScheduler extends FairScheduler
}
super.serviceStop();
}
public String getRealQueueName(String queue) throws YarnException {
if (!getQueueManager().exists(queue)) {
throw new YarnException("Can't find the queue by the given name: " + queue
+ "! Please check if queue " + queue + " is in the allocation file.");
}
return getQueueManager().getQueue(queue).getQueueName();
}
}

View File

@ -19,10 +19,14 @@ package org.apache.hadoop.yarn.sls.scheduler;
import org.apache.hadoop.classification.InterfaceAudience.Private;
import org.apache.hadoop.classification.InterfaceStability.Unstable;
import org.apache.hadoop.yarn.exceptions.YarnException;
@Private
@Unstable
public interface SchedulerWrapper {
SchedulerMetrics getSchedulerMetrics();
Tracker getTracker();
String getRealQueueName(String queue) throws YarnException;
}

View File

@ -43,7 +43,7 @@ import org.apache.hadoop.tools.rumen.LoggedTaskAttempt;
@Private
@Unstable
public class SLSUtils {
public static final int SHUTDOWN_HOOK_PRIORITY = 30;
public final static String DEFAULT_JOB_TYPE = "mapreduce";
// hostname includes the network path and the host name. for example
// "/default-rack/hostFoo" or "/coreSwitchA/TORSwitchB/hostBar".