YARN-7798. Refactor SLS Reservation Creation. Contributed by Young Chen.

This commit is contained in:
Yufei Gu 2018-01-25 13:06:01 -08:00
parent 0c139d5bcf
commit 16be42d309
5 changed files with 119 additions and 80 deletions

View File

@ -57,7 +57,6 @@ import org.apache.hadoop.tools.rumen.TaskAttemptInfo;
import org.apache.hadoop.util.ReflectionUtils;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
import org.apache.hadoop.yarn.api.protocolrecords.ReservationSubmissionRequest;
import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.hadoop.yarn.api.records.NodeState;
import org.apache.hadoop.yarn.api.records.ReservationId;
@ -444,7 +443,7 @@ public class SLSRunner extends Configured implements Tool {
for (int i = 0; i < jobCount; i++) {
runNewAM(amType, user, queue, oldAppId, jobStartTime, jobFinishTime,
getTaskContainers(jsonJob), null, getAMContainerResource(jsonJob));
getTaskContainers(jsonJob), getAMContainerResource(jsonJob));
}
}
@ -607,7 +606,7 @@ public class SLSRunner extends Configured implements Tool {
// Only supports the default job type currently
runNewAM(SLSUtils.DEFAULT_JOB_TYPE, user, jobQueue, oldJobId,
jobStartTimeMS, jobFinishTimeMS, containerList, null,
jobStartTimeMS, jobFinishTimeMS, containerList,
getAMContainerResource(null));
}
@ -628,10 +627,6 @@ public class SLSRunner extends Configured implements Tool {
localConf.set("fs.defaultFS", "file:///");
long baselineTimeMS = 0;
// reservations use wall clock time, so need to have a reference for that
UTCClock clock = new UTCClock();
long now = clock.getTime();
try {
// if we use the nodeFile this could have been not initialized yet.
@ -670,13 +665,12 @@ public class SLSRunner extends Configured implements Tool {
ArrayList<NodeId> keyAsArray = new ArrayList<NodeId>(nmMap.keySet());
Random rand = new Random(stjp.getSeed());
Resource maxMapRes = Resource.newInstance(0, 0);
long maxMapDur = 0;
// map tasks
for (int i = 0; i < job.getNumberMaps(); i++) {
TaskAttemptInfo tai = job.getTaskAttemptInfo(TaskType.MAP, i, 0);
RMNode node = nmMap
.get(keyAsArray.get(rand.nextInt(keyAsArray.size()))).getNode();
RMNode node =
nmMap.get(keyAsArray.get(rand.nextInt(keyAsArray.size())))
.getNode();
String hostname = "/" + node.getRackName() + "/" + node.getHostName();
long containerLifeTime = tai.getRuntime();
Resource containerResource =
@ -684,55 +678,39 @@ public class SLSRunner extends Configured implements Tool {
(int) tai.getTaskInfo().getTaskVCores());
containerList.add(new ContainerSimulator(containerResource,
containerLifeTime, hostname, DEFAULT_MAPPER_PRIORITY, "map"));
maxMapRes = Resources.componentwiseMax(maxMapRes, containerResource);
maxMapDur =
containerLifeTime > maxMapDur ? containerLifeTime : maxMapDur;
}
Resource maxRedRes = Resource.newInstance(0, 0);
long maxRedDur = 0;
// reduce tasks
for (int i = 0; i < job.getNumberReduces(); i++) {
TaskAttemptInfo tai = job.getTaskAttemptInfo(TaskType.REDUCE, i, 0);
RMNode node = nmMap
.get(keyAsArray.get(rand.nextInt(keyAsArray.size()))).getNode();
RMNode node =
nmMap.get(keyAsArray.get(rand.nextInt(keyAsArray.size())))
.getNode();
String hostname = "/" + node.getRackName() + "/" + node.getHostName();
long containerLifeTime = tai.getRuntime();
Resource containerResource =
Resource.newInstance((int) tai.getTaskInfo().getTaskMemory(),
(int) tai.getTaskInfo().getTaskVCores());
containerList.add(new ContainerSimulator(containerResource,
containerLifeTime, hostname, DEFAULT_REDUCER_PRIORITY, "reduce"));
maxRedRes = Resources.componentwiseMax(maxRedRes, containerResource);
maxRedDur =
containerLifeTime > maxRedDur ? containerLifeTime : maxRedDur;
containerList.add(
new ContainerSimulator(containerResource, containerLifeTime,
hostname, DEFAULT_REDUCER_PRIORITY, "reduce"));
}
// generating reservations for the jobs that require them
ReservationId reservationId = null;
ReservationSubmissionRequest rr = null;
if (job.hasDeadline()) {
ReservationId reservationId =
reservationId =
ReservationId.newInstance(this.rm.getStartTime(), AM_ID);
rr = ReservationClientUtil.createMRReservation(reservationId,
"reservation_" + AM_ID, maxMapRes, job.getNumberMaps(), maxMapDur,
maxRedRes, job.getNumberReduces(), maxRedDur,
now + jobStartTimeMS, now + job.getDeadline(),
job.getQueueName());
}
runNewAM(SLSUtils.DEFAULT_JOB_TYPE, user, jobQueue, oldJobId,
jobStartTimeMS, jobFinishTimeMS, containerList, rr,
getAMContainerResource(null));
jobStartTimeMS, jobFinishTimeMS, containerList, reservationId,
job.getDeadline(), getAMContainerResource(null));
}
} finally {
stjp.close();
}
}
private Resource getAMContainerResource(Map jsonJob) {
@ -772,7 +750,17 @@ public class SLSRunner extends Configured implements Tool {
private void runNewAM(String jobType, String user,
String jobQueue, String oldJobId, long jobStartTimeMS,
long jobFinishTimeMS, List<ContainerSimulator> containerList,
ReservationSubmissionRequest rr, Resource amContainerResource) {
Resource amContainerResource) {
runNewAM(jobType, user, jobQueue, oldJobId, jobStartTimeMS,
jobFinishTimeMS, containerList, null, -1,
amContainerResource);
}
private void runNewAM(String jobType, String user,
String jobQueue, String oldJobId, long jobStartTimeMS,
long jobFinishTimeMS, List<ContainerSimulator> containerList,
ReservationId reservationId, long deadline,
Resource amContainerResource) {
AMSimulator amSim = (AMSimulator) ReflectionUtils.newInstance(
amClassMap.get(jobType), new Configuration());
@ -787,10 +775,15 @@ public class SLSRunner extends Configured implements Tool {
oldJobId = Integer.toString(AM_ID);
}
AM_ID++;
amSim.init(heartbeatInterval, containerList, rm, this, jobStartTimeMS,
jobFinishTimeMS, user, jobQueue, isTracked, oldJobId, rr,
jobFinishTimeMS, user, jobQueue, isTracked, oldJobId,
runner.getStartTimeMS(), amContainerResource);
if(reservationId != null) {
// if we have a ReservationId, delegate reservation creation to
// AMSim (reservation shape is impl specific)
UTCClock clock = new UTCClock();
amSim.initReservation(reservationId, deadline, clock.getTime());
}
runner.schedule(amSim);
maxRuntime = Math.max(maxRuntime, jobFinishTimeMS);
numTasks += containerList.size();

View File

@ -85,7 +85,7 @@ public abstract class AMSimulator extends TaskRunner.Task {
protected final BlockingQueue<AllocateResponse> responseQueue;
private int responseId = 0;
// user name
protected String user;
private String user;
// queue name
protected String queue;
// am type
@ -105,7 +105,7 @@ public abstract class AMSimulator extends TaskRunner.Task {
// waiting for AM container
volatile boolean isAMContainerRunning = false;
volatile Container amContainer;
private static final Logger LOG = LoggerFactory.getLogger(AMSimulator.class);
private Resource amContainerResource;
@ -120,9 +120,8 @@ public abstract class AMSimulator extends TaskRunner.Task {
public void init(int heartbeatInterval,
List<ContainerSimulator> containerList, ResourceManager resourceManager,
SLSRunner slsRunnner, long startTime, long finishTime, String simUser,
String simQueue, boolean tracked, String oldApp,
ReservationSubmissionRequest rr, long baseTimeMS,
Resource amContainerResource) {
String simQueue, boolean tracked, String oldApp, long baseTimeMS,
Resource amResource) {
super.init(startTime, startTime + 1000000L * heartbeatInterval,
heartbeatInterval);
this.user = simUser;
@ -134,8 +133,7 @@ public abstract class AMSimulator extends TaskRunner.Task {
this.baselineTimeMS = baseTimeMS;
this.traceStartTimeMS = startTime;
this.traceFinishTimeMS = finishTime;
this.reservationRequest = rr;
this.amContainerResource = amContainerResource;
this.amContainerResource = amResource;
}
/**
@ -171,6 +169,10 @@ public abstract class AMSimulator extends TaskRunner.Task {
isAMContainerRunning = true;
}
protected void setReservationRequest(ReservationSubmissionRequest rr){
this.reservationRequest = rr;
}
private ReservationId submitReservationWhenSpecified()
throws IOException, InterruptedException {
if (reservationRequest != null) {
@ -256,7 +258,7 @@ public abstract class AMSimulator extends TaskRunner.Task {
simulateStartTimeMS, simulateFinishTimeMS);
}
}
protected ResourceRequest createResourceRequest(
Resource resource, String host, int priority, int numContainers) {
ResourceRequest request = recordFactory
@ -269,7 +271,7 @@ public abstract class AMSimulator extends TaskRunner.Task {
request.setPriority(prio);
return request;
}
protected AllocateRequest createAllocateRequest(List<ResourceRequest> ask,
List<ContainerId> toRelease) {
AllocateRequest allocateRequest =
@ -279,36 +281,39 @@ public abstract class AMSimulator extends TaskRunner.Task {
allocateRequest.setReleaseList(toRelease);
return allocateRequest;
}
protected AllocateRequest createAllocateRequest(List<ResourceRequest> ask) {
return createAllocateRequest(ask, new ArrayList<ContainerId>());
}
protected abstract void processResponseQueue() throws Exception;
protected abstract void sendContainerRequest() throws Exception;
public abstract void initReservation(
ReservationId reservationId, long deadline, long now);
protected abstract void checkStop();
private void submitApp(ReservationId reservationId)
throws YarnException, InterruptedException, IOException {
// ask for new application
GetNewApplicationRequest newAppRequest =
Records.newRecord(GetNewApplicationRequest.class);
GetNewApplicationResponse newAppResponse =
GetNewApplicationResponse newAppResponse =
rm.getClientRMService().getNewApplication(newAppRequest);
appId = newAppResponse.getApplicationId();
// submit the application
final SubmitApplicationRequest subAppRequest =
Records.newRecord(SubmitApplicationRequest.class);
ApplicationSubmissionContext appSubContext =
ApplicationSubmissionContext appSubContext =
Records.newRecord(ApplicationSubmissionContext.class);
appSubContext.setApplicationId(appId);
appSubContext.setMaxAppAttempts(1);
appSubContext.setQueue(queue);
appSubContext.setPriority(Priority.newInstance(0));
ContainerLaunchContext conLauContext =
ContainerLaunchContext conLauContext =
Records.newRecord(ContainerLaunchContext.class);
conLauContext.setApplicationACLs(new HashMap<>());
conLauContext.setCommands(new ArrayList<>());
@ -379,7 +384,7 @@ public abstract class AMSimulator extends TaskRunner.Task {
}
}
}
protected List<ResourceRequest> packageRequests(
List<ContainerSimulator> csList, int priority) {
// create requests

View File

@ -21,6 +21,7 @@ package org.apache.hadoop.yarn.sls.appmaster;
import java.io.IOException;
import java.security.PrivilegedExceptionAction;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.LinkedList;
import java.util.List;
@ -34,6 +35,7 @@ import org.apache.hadoop.yarn.api.protocolrecords.AllocateRequest;
import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse;
import org.apache.hadoop.yarn.api.protocolrecords.ReservationSubmissionRequest;
import org.apache.hadoop.yarn.api.records.Container;
import org.apache.hadoop.yarn.api.records.ReservationId;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.api.records.ContainerExitStatus;
import org.apache.hadoop.yarn.api.records.ContainerId;
@ -42,8 +44,10 @@ import org.apache.hadoop.yarn.api.records.ResourceRequest;
import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.security.AMRMTokenIdentifier;
import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager;
import org.apache.hadoop.yarn.sls.ReservationClientUtil;
import org.apache.hadoop.yarn.sls.scheduler.ContainerSimulator;
import org.apache.hadoop.yarn.sls.SLSRunner;
import org.apache.hadoop.yarn.util.resource.Resources;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@ -51,51 +55,51 @@ import org.slf4j.LoggerFactory;
@Unstable
public class MRAMSimulator extends AMSimulator {
/*
Vocabulary Used:
Vocabulary Used:
pending -> requests which are NOT yet sent to RM
scheduled -> requests which are sent to RM but not yet assigned
assigned -> requests which are assigned to a container
completed -> request corresponding to which container has completed
Maps are scheduled as soon as their requests are received. Reduces are
scheduled when all maps have finished (not support slow-start currently).
*/
private static final int PRIORITY_REDUCE = 10;
private static final int PRIORITY_MAP = 20;
// pending maps
private LinkedList<ContainerSimulator> pendingMaps =
new LinkedList<>();
// pending failed maps
private LinkedList<ContainerSimulator> pendingFailedMaps =
new LinkedList<ContainerSimulator>();
// scheduled maps
private LinkedList<ContainerSimulator> scheduledMaps =
new LinkedList<ContainerSimulator>();
// assigned maps
private Map<ContainerId, ContainerSimulator> assignedMaps =
new HashMap<ContainerId, ContainerSimulator>();
// reduces which are not yet scheduled
private LinkedList<ContainerSimulator> pendingReduces =
new LinkedList<ContainerSimulator>();
// pending failed reduces
private LinkedList<ContainerSimulator> pendingFailedReduces =
new LinkedList<ContainerSimulator>();
// scheduled reduces
private LinkedList<ContainerSimulator> scheduledReduces =
new LinkedList<ContainerSimulator>();
// assigned reduces
private Map<ContainerId, ContainerSimulator> assignedReduces =
new HashMap<ContainerId, ContainerSimulator>();
// all maps & reduces
private LinkedList<ContainerSimulator> allMaps =
new LinkedList<ContainerSimulator>();
@ -117,14 +121,14 @@ public class MRAMSimulator extends AMSimulator {
@SuppressWarnings("checkstyle:parameternumber")
public void init(int heartbeatInterval,
List<ContainerSimulator> containerList, ResourceManager rm, SLSRunner se,
long traceStartTime, long traceFinishTime, String user, String queue,
boolean isTracked, String oldAppId, ReservationSubmissionRequest rr,
long baselineStartTimeMS, Resource amContainerResource) {
long traceStartTime, long traceFinishTime, String user, String queue,
boolean isTracked, String oldAppId, long baselineStartTimeMS,
Resource amContainerResource) {
super.init(heartbeatInterval, containerList, rm, se,
traceStartTime, traceFinishTime, user, queue, isTracked, oldAppId,
rr, baselineStartTimeMS, amContainerResource);
baselineStartTimeMS, amContainerResource);
amtype = "mapreduce";
// get map/reduce tasks
for (ContainerSimulator cs : containerList) {
if (cs.getType().equals("map")) {
@ -202,7 +206,7 @@ public class MRAMSimulator extends AMSimulator {
}
}
}
// check finished
if (isAMContainerRunning &&
(mapFinished >= mapTotal) &&
@ -234,7 +238,7 @@ public class MRAMSimulator extends AMSimulator {
}
}
}
/**
* restart running because of the am container killed
*/
@ -322,7 +326,7 @@ public class MRAMSimulator extends AMSimulator {
if (ask == null) {
ask = new ArrayList<>();
}
final AllocateRequest request = createAllocateRequest(ask);
if (totalContainers == 0) {
request.setProgress(1.0f);
@ -348,6 +352,38 @@ public class MRAMSimulator extends AMSimulator {
}
}
@Override
public void initReservation(ReservationId reservationId, long deadline,
long now) {
Resource mapRes = getMaxResource(allMaps);
long mapDur = getMaxDuration(allMaps);
Resource redRes = getMaxResource(allReduces);
long redDur = getMaxDuration(allReduces);
ReservationSubmissionRequest rr = ReservationClientUtil.
createMRReservation(reservationId,
"reservation_" + reservationId.getId(), mapRes, allMaps.size(),
mapDur, redRes, allReduces.size(), redDur, now + traceStartTimeMS,
now + deadline, queue);
setReservationRequest(rr);
}
// Helper to compute the component-wise maximum resource used by any container
private Resource getMaxResource(Collection<ContainerSimulator> containers) {
return containers.parallelStream()
.map(ContainerSimulator::getResource)
.reduce(Resource.newInstance(0, 0), Resources::componentwiseMax);
}
// Helper to compute the maximum resource used by any map container
private long getMaxDuration(Collection<ContainerSimulator> containers) {
return containers.parallelStream()
.mapToLong(ContainerSimulator::getLifeTime)
.reduce(0L, Long::max);
}
@Override
protected void checkStop() {
if (isFinished) {

View File

@ -78,7 +78,7 @@ public class TestSLSRunner extends BaseSLSRunnerTest {
exitInvariantFile = "src/test/resources/exit-invariants.txt";
}
@Test(timeout = 60000)
@Test(timeout = 90000)
@SuppressWarnings("all")
public void testSimulatorRunning() throws Exception {
Configuration conf = new Configuration(false);

View File

@ -19,6 +19,7 @@ package org.apache.hadoop.yarn.sls.appmaster;
import com.codahale.metrics.MetricRegistry;
import org.apache.commons.io.FileUtils;
import org.apache.hadoop.yarn.api.records.ReservationId;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager;
@ -89,6 +90,10 @@ public class TestAMSimulator {
throws YarnException, IOException, InterruptedException {
}
@Override
public void initReservation(ReservationId id, long deadline, long now){
}
@Override
protected void checkStop() {
}
@ -134,7 +139,7 @@ public class TestAMSimulator {
String queue = "default";
List<ContainerSimulator> containers = new ArrayList<>();
app.init(1000, containers, rm, null, 0, 1000000L, "user1", queue, true,
appId, null, 0, SLSConfiguration.getAMContainerResource(conf));
appId, 0, SLSConfiguration.getAMContainerResource(conf));
app.firstStep();
verifySchedulerMetrics(appId);