From 16be42d3097c13b17d704e5b6dc8d66bd5ff6d9a Mon Sep 17 00:00:00 2001 From: Yufei Gu Date: Thu, 25 Jan 2018 13:06:01 -0800 Subject: [PATCH] YARN-7798. Refactor SLS Reservation Creation. Contributed by Young Chen. --- .../org/apache/hadoop/yarn/sls/SLSRunner.java | 75 +++++++++---------- .../yarn/sls/appmaster/AMSimulator.java | 41 +++++----- .../yarn/sls/appmaster/MRAMSimulator.java | 74 +++++++++++++----- .../apache/hadoop/yarn/sls/TestSLSRunner.java | 2 +- .../yarn/sls/appmaster/TestAMSimulator.java | 7 +- 5 files changed, 119 insertions(+), 80 deletions(-) diff --git a/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/SLSRunner.java b/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/SLSRunner.java index ad4310f3648..456602f19a4 100644 --- a/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/SLSRunner.java +++ b/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/SLSRunner.java @@ -57,7 +57,6 @@ import org.apache.hadoop.tools.rumen.TaskAttemptInfo; import org.apache.hadoop.util.ReflectionUtils; import org.apache.hadoop.util.Tool; import org.apache.hadoop.util.ToolRunner; -import org.apache.hadoop.yarn.api.protocolrecords.ReservationSubmissionRequest; import org.apache.hadoop.yarn.api.records.NodeId; import org.apache.hadoop.yarn.api.records.NodeState; import org.apache.hadoop.yarn.api.records.ReservationId; @@ -444,7 +443,7 @@ public class SLSRunner extends Configured implements Tool { for (int i = 0; i < jobCount; i++) { runNewAM(amType, user, queue, oldAppId, jobStartTime, jobFinishTime, - getTaskContainers(jsonJob), null, getAMContainerResource(jsonJob)); + getTaskContainers(jsonJob), getAMContainerResource(jsonJob)); } } @@ -607,7 +606,7 @@ public class SLSRunner extends Configured implements Tool { // Only supports the default job type currently runNewAM(SLSUtils.DEFAULT_JOB_TYPE, user, jobQueue, oldJobId, - jobStartTimeMS, jobFinishTimeMS, containerList, null, + jobStartTimeMS, jobFinishTimeMS, containerList, getAMContainerResource(null)); } @@ -628,10 +627,6 @@ public class SLSRunner extends Configured implements Tool { localConf.set("fs.defaultFS", "file:///"); long baselineTimeMS = 0; - // reservations use wall clock time, so need to have a reference for that - UTCClock clock = new UTCClock(); - long now = clock.getTime(); - try { // if we use the nodeFile this could have been not initialized yet. @@ -670,13 +665,12 @@ public class SLSRunner extends Configured implements Tool { ArrayList keyAsArray = new ArrayList(nmMap.keySet()); Random rand = new Random(stjp.getSeed()); - Resource maxMapRes = Resource.newInstance(0, 0); - long maxMapDur = 0; // map tasks for (int i = 0; i < job.getNumberMaps(); i++) { TaskAttemptInfo tai = job.getTaskAttemptInfo(TaskType.MAP, i, 0); - RMNode node = nmMap - .get(keyAsArray.get(rand.nextInt(keyAsArray.size()))).getNode(); + RMNode node = + nmMap.get(keyAsArray.get(rand.nextInt(keyAsArray.size()))) + .getNode(); String hostname = "/" + node.getRackName() + "/" + node.getHostName(); long containerLifeTime = tai.getRuntime(); Resource containerResource = @@ -684,55 +678,39 @@ public class SLSRunner extends Configured implements Tool { (int) tai.getTaskInfo().getTaskVCores()); containerList.add(new ContainerSimulator(containerResource, containerLifeTime, hostname, DEFAULT_MAPPER_PRIORITY, "map")); - maxMapRes = Resources.componentwiseMax(maxMapRes, containerResource); - maxMapDur = - containerLifeTime > maxMapDur ? containerLifeTime : maxMapDur; - } - Resource maxRedRes = Resource.newInstance(0, 0); - long maxRedDur = 0; // reduce tasks for (int i = 0; i < job.getNumberReduces(); i++) { TaskAttemptInfo tai = job.getTaskAttemptInfo(TaskType.REDUCE, i, 0); - RMNode node = nmMap - .get(keyAsArray.get(rand.nextInt(keyAsArray.size()))).getNode(); + RMNode node = + nmMap.get(keyAsArray.get(rand.nextInt(keyAsArray.size()))) + .getNode(); String hostname = "/" + node.getRackName() + "/" + node.getHostName(); long containerLifeTime = tai.getRuntime(); Resource containerResource = Resource.newInstance((int) tai.getTaskInfo().getTaskMemory(), (int) tai.getTaskInfo().getTaskVCores()); - containerList.add(new ContainerSimulator(containerResource, - containerLifeTime, hostname, DEFAULT_REDUCER_PRIORITY, "reduce")); - maxRedRes = Resources.componentwiseMax(maxRedRes, containerResource); - maxRedDur = - containerLifeTime > maxRedDur ? containerLifeTime : maxRedDur; - + containerList.add( + new ContainerSimulator(containerResource, containerLifeTime, + hostname, DEFAULT_REDUCER_PRIORITY, "reduce")); } - // generating reservations for the jobs that require them + ReservationId reservationId = null; - ReservationSubmissionRequest rr = null; if (job.hasDeadline()) { - ReservationId reservationId = + reservationId = ReservationId.newInstance(this.rm.getStartTime(), AM_ID); - - rr = ReservationClientUtil.createMRReservation(reservationId, - "reservation_" + AM_ID, maxMapRes, job.getNumberMaps(), maxMapDur, - maxRedRes, job.getNumberReduces(), maxRedDur, - now + jobStartTimeMS, now + job.getDeadline(), - job.getQueueName()); - } runNewAM(SLSUtils.DEFAULT_JOB_TYPE, user, jobQueue, oldJobId, - jobStartTimeMS, jobFinishTimeMS, containerList, rr, - getAMContainerResource(null)); + jobStartTimeMS, jobFinishTimeMS, containerList, reservationId, + job.getDeadline(), getAMContainerResource(null)); + } } finally { stjp.close(); } - } private Resource getAMContainerResource(Map jsonJob) { @@ -772,7 +750,17 @@ public class SLSRunner extends Configured implements Tool { private void runNewAM(String jobType, String user, String jobQueue, String oldJobId, long jobStartTimeMS, long jobFinishTimeMS, List containerList, - ReservationSubmissionRequest rr, Resource amContainerResource) { + Resource amContainerResource) { + runNewAM(jobType, user, jobQueue, oldJobId, jobStartTimeMS, + jobFinishTimeMS, containerList, null, -1, + amContainerResource); + } + + private void runNewAM(String jobType, String user, + String jobQueue, String oldJobId, long jobStartTimeMS, + long jobFinishTimeMS, List containerList, + ReservationId reservationId, long deadline, + Resource amContainerResource) { AMSimulator amSim = (AMSimulator) ReflectionUtils.newInstance( amClassMap.get(jobType), new Configuration()); @@ -787,10 +775,15 @@ public class SLSRunner extends Configured implements Tool { oldJobId = Integer.toString(AM_ID); } AM_ID++; - amSim.init(heartbeatInterval, containerList, rm, this, jobStartTimeMS, - jobFinishTimeMS, user, jobQueue, isTracked, oldJobId, rr, + jobFinishTimeMS, user, jobQueue, isTracked, oldJobId, runner.getStartTimeMS(), amContainerResource); + if(reservationId != null) { + // if we have a ReservationId, delegate reservation creation to + // AMSim (reservation shape is impl specific) + UTCClock clock = new UTCClock(); + amSim.initReservation(reservationId, deadline, clock.getTime()); + } runner.schedule(amSim); maxRuntime = Math.max(maxRuntime, jobFinishTimeMS); numTasks += containerList.size(); diff --git a/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/appmaster/AMSimulator.java b/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/appmaster/AMSimulator.java index 72698ea10a5..5727b5f37ba 100644 --- a/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/appmaster/AMSimulator.java +++ b/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/appmaster/AMSimulator.java @@ -85,7 +85,7 @@ public abstract class AMSimulator extends TaskRunner.Task { protected final BlockingQueue responseQueue; private int responseId = 0; // user name - protected String user; + private String user; // queue name protected String queue; // am type @@ -105,7 +105,7 @@ public abstract class AMSimulator extends TaskRunner.Task { // waiting for AM container volatile boolean isAMContainerRunning = false; volatile Container amContainer; - + private static final Logger LOG = LoggerFactory.getLogger(AMSimulator.class); private Resource amContainerResource; @@ -120,9 +120,8 @@ public abstract class AMSimulator extends TaskRunner.Task { public void init(int heartbeatInterval, List containerList, ResourceManager resourceManager, SLSRunner slsRunnner, long startTime, long finishTime, String simUser, - String simQueue, boolean tracked, String oldApp, - ReservationSubmissionRequest rr, long baseTimeMS, - Resource amContainerResource) { + String simQueue, boolean tracked, String oldApp, long baseTimeMS, + Resource amResource) { super.init(startTime, startTime + 1000000L * heartbeatInterval, heartbeatInterval); this.user = simUser; @@ -134,8 +133,7 @@ public abstract class AMSimulator extends TaskRunner.Task { this.baselineTimeMS = baseTimeMS; this.traceStartTimeMS = startTime; this.traceFinishTimeMS = finishTime; - this.reservationRequest = rr; - this.amContainerResource = amContainerResource; + this.amContainerResource = amResource; } /** @@ -171,6 +169,10 @@ public abstract class AMSimulator extends TaskRunner.Task { isAMContainerRunning = true; } + protected void setReservationRequest(ReservationSubmissionRequest rr){ + this.reservationRequest = rr; + } + private ReservationId submitReservationWhenSpecified() throws IOException, InterruptedException { if (reservationRequest != null) { @@ -256,7 +258,7 @@ public abstract class AMSimulator extends TaskRunner.Task { simulateStartTimeMS, simulateFinishTimeMS); } } - + protected ResourceRequest createResourceRequest( Resource resource, String host, int priority, int numContainers) { ResourceRequest request = recordFactory @@ -269,7 +271,7 @@ public abstract class AMSimulator extends TaskRunner.Task { request.setPriority(prio); return request; } - + protected AllocateRequest createAllocateRequest(List ask, List toRelease) { AllocateRequest allocateRequest = @@ -279,36 +281,39 @@ public abstract class AMSimulator extends TaskRunner.Task { allocateRequest.setReleaseList(toRelease); return allocateRequest; } - + protected AllocateRequest createAllocateRequest(List ask) { return createAllocateRequest(ask, new ArrayList()); } protected abstract void processResponseQueue() throws Exception; - + protected abstract void sendContainerRequest() throws Exception; - + + public abstract void initReservation( + ReservationId reservationId, long deadline, long now); + protected abstract void checkStop(); - + private void submitApp(ReservationId reservationId) throws YarnException, InterruptedException, IOException { // ask for new application GetNewApplicationRequest newAppRequest = Records.newRecord(GetNewApplicationRequest.class); - GetNewApplicationResponse newAppResponse = + GetNewApplicationResponse newAppResponse = rm.getClientRMService().getNewApplication(newAppRequest); appId = newAppResponse.getApplicationId(); - + // submit the application final SubmitApplicationRequest subAppRequest = Records.newRecord(SubmitApplicationRequest.class); - ApplicationSubmissionContext appSubContext = + ApplicationSubmissionContext appSubContext = Records.newRecord(ApplicationSubmissionContext.class); appSubContext.setApplicationId(appId); appSubContext.setMaxAppAttempts(1); appSubContext.setQueue(queue); appSubContext.setPriority(Priority.newInstance(0)); - ContainerLaunchContext conLauContext = + ContainerLaunchContext conLauContext = Records.newRecord(ContainerLaunchContext.class); conLauContext.setApplicationACLs(new HashMap<>()); conLauContext.setCommands(new ArrayList<>()); @@ -379,7 +384,7 @@ public abstract class AMSimulator extends TaskRunner.Task { } } } - + protected List packageRequests( List csList, int priority) { // create requests diff --git a/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/appmaster/MRAMSimulator.java b/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/appmaster/MRAMSimulator.java index 21bf05402b0..18a155cb2a6 100644 --- a/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/appmaster/MRAMSimulator.java +++ b/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/appmaster/MRAMSimulator.java @@ -21,6 +21,7 @@ package org.apache.hadoop.yarn.sls.appmaster; import java.io.IOException; import java.security.PrivilegedExceptionAction; import java.util.ArrayList; +import java.util.Collection; import java.util.HashMap; import java.util.LinkedList; import java.util.List; @@ -34,6 +35,7 @@ import org.apache.hadoop.yarn.api.protocolrecords.AllocateRequest; import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse; import org.apache.hadoop.yarn.api.protocolrecords.ReservationSubmissionRequest; import org.apache.hadoop.yarn.api.records.Container; +import org.apache.hadoop.yarn.api.records.ReservationId; import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.api.records.ContainerExitStatus; import org.apache.hadoop.yarn.api.records.ContainerId; @@ -42,8 +44,10 @@ import org.apache.hadoop.yarn.api.records.ResourceRequest; import org.apache.hadoop.yarn.exceptions.YarnException; import org.apache.hadoop.yarn.security.AMRMTokenIdentifier; import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager; +import org.apache.hadoop.yarn.sls.ReservationClientUtil; import org.apache.hadoop.yarn.sls.scheduler.ContainerSimulator; import org.apache.hadoop.yarn.sls.SLSRunner; +import org.apache.hadoop.yarn.util.resource.Resources; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -51,51 +55,51 @@ import org.slf4j.LoggerFactory; @Unstable public class MRAMSimulator extends AMSimulator { /* - Vocabulary Used: + Vocabulary Used: pending -> requests which are NOT yet sent to RM scheduled -> requests which are sent to RM but not yet assigned assigned -> requests which are assigned to a container completed -> request corresponding to which container has completed - + Maps are scheduled as soon as their requests are received. Reduces are scheduled when all maps have finished (not support slow-start currently). */ - + private static final int PRIORITY_REDUCE = 10; private static final int PRIORITY_MAP = 20; // pending maps private LinkedList pendingMaps = new LinkedList<>(); - + // pending failed maps private LinkedList pendingFailedMaps = new LinkedList(); - + // scheduled maps private LinkedList scheduledMaps = new LinkedList(); - + // assigned maps private Map assignedMaps = new HashMap(); - + // reduces which are not yet scheduled private LinkedList pendingReduces = new LinkedList(); - + // pending failed reduces private LinkedList pendingFailedReduces = new LinkedList(); - + // scheduled reduces private LinkedList scheduledReduces = new LinkedList(); - + // assigned reduces private Map assignedReduces = new HashMap(); - + // all maps & reduces private LinkedList allMaps = new LinkedList(); @@ -117,14 +121,14 @@ public class MRAMSimulator extends AMSimulator { @SuppressWarnings("checkstyle:parameternumber") public void init(int heartbeatInterval, List containerList, ResourceManager rm, SLSRunner se, - long traceStartTime, long traceFinishTime, String user, String queue, - boolean isTracked, String oldAppId, ReservationSubmissionRequest rr, - long baselineStartTimeMS, Resource amContainerResource) { + long traceStartTime, long traceFinishTime, String user, String queue, + boolean isTracked, String oldAppId, long baselineStartTimeMS, + Resource amContainerResource) { super.init(heartbeatInterval, containerList, rm, se, traceStartTime, traceFinishTime, user, queue, isTracked, oldAppId, - rr, baselineStartTimeMS, amContainerResource); + baselineStartTimeMS, amContainerResource); amtype = "mapreduce"; - + // get map/reduce tasks for (ContainerSimulator cs : containerList) { if (cs.getType().equals("map")) { @@ -202,7 +206,7 @@ public class MRAMSimulator extends AMSimulator { } } } - + // check finished if (isAMContainerRunning && (mapFinished >= mapTotal) && @@ -234,7 +238,7 @@ public class MRAMSimulator extends AMSimulator { } } } - + /** * restart running because of the am container killed */ @@ -322,7 +326,7 @@ public class MRAMSimulator extends AMSimulator { if (ask == null) { ask = new ArrayList<>(); } - + final AllocateRequest request = createAllocateRequest(ask); if (totalContainers == 0) { request.setProgress(1.0f); @@ -348,6 +352,38 @@ public class MRAMSimulator extends AMSimulator { } } + @Override + public void initReservation(ReservationId reservationId, long deadline, + long now) { + + Resource mapRes = getMaxResource(allMaps); + long mapDur = getMaxDuration(allMaps); + Resource redRes = getMaxResource(allReduces); + long redDur = getMaxDuration(allReduces); + + ReservationSubmissionRequest rr = ReservationClientUtil. + createMRReservation(reservationId, + "reservation_" + reservationId.getId(), mapRes, allMaps.size(), + mapDur, redRes, allReduces.size(), redDur, now + traceStartTimeMS, + now + deadline, queue); + + setReservationRequest(rr); + } + + // Helper to compute the component-wise maximum resource used by any container + private Resource getMaxResource(Collection containers) { + return containers.parallelStream() + .map(ContainerSimulator::getResource) + .reduce(Resource.newInstance(0, 0), Resources::componentwiseMax); + } + + // Helper to compute the maximum resource used by any map container + private long getMaxDuration(Collection containers) { + return containers.parallelStream() + .mapToLong(ContainerSimulator::getLifeTime) + .reduce(0L, Long::max); + } + @Override protected void checkStop() { if (isFinished) { diff --git a/hadoop-tools/hadoop-sls/src/test/java/org/apache/hadoop/yarn/sls/TestSLSRunner.java b/hadoop-tools/hadoop-sls/src/test/java/org/apache/hadoop/yarn/sls/TestSLSRunner.java index 567f0d9d3b8..abb3b5e904a 100644 --- a/hadoop-tools/hadoop-sls/src/test/java/org/apache/hadoop/yarn/sls/TestSLSRunner.java +++ b/hadoop-tools/hadoop-sls/src/test/java/org/apache/hadoop/yarn/sls/TestSLSRunner.java @@ -78,7 +78,7 @@ public class TestSLSRunner extends BaseSLSRunnerTest { exitInvariantFile = "src/test/resources/exit-invariants.txt"; } - @Test(timeout = 60000) + @Test(timeout = 90000) @SuppressWarnings("all") public void testSimulatorRunning() throws Exception { Configuration conf = new Configuration(false); diff --git a/hadoop-tools/hadoop-sls/src/test/java/org/apache/hadoop/yarn/sls/appmaster/TestAMSimulator.java b/hadoop-tools/hadoop-sls/src/test/java/org/apache/hadoop/yarn/sls/appmaster/TestAMSimulator.java index 02dc26eeaf9..a67845bb8e5 100644 --- a/hadoop-tools/hadoop-sls/src/test/java/org/apache/hadoop/yarn/sls/appmaster/TestAMSimulator.java +++ b/hadoop-tools/hadoop-sls/src/test/java/org/apache/hadoop/yarn/sls/appmaster/TestAMSimulator.java @@ -19,6 +19,7 @@ package org.apache.hadoop.yarn.sls.appmaster; import com.codahale.metrics.MetricRegistry; import org.apache.commons.io.FileUtils; +import org.apache.hadoop.yarn.api.records.ReservationId; import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.exceptions.YarnException; import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager; @@ -89,6 +90,10 @@ public class TestAMSimulator { throws YarnException, IOException, InterruptedException { } + @Override + public void initReservation(ReservationId id, long deadline, long now){ + } + @Override protected void checkStop() { } @@ -134,7 +139,7 @@ public class TestAMSimulator { String queue = "default"; List containers = new ArrayList<>(); app.init(1000, containers, rm, null, 0, 1000000L, "user1", queue, true, - appId, null, 0, SLSConfiguration.getAMContainerResource(conf)); + appId, 0, SLSConfiguration.getAMContainerResource(conf)); app.firstStep(); verifySchedulerMetrics(appId);