diff --git a/hadoop-yarn-project/CHANGES.txt b/hadoop-yarn-project/CHANGES.txt index 6a1d136bd23..949ba636704 100644 --- a/hadoop-yarn-project/CHANGES.txt +++ b/hadoop-yarn-project/CHANGES.txt @@ -160,6 +160,9 @@ Release 2.0.5-beta - UNRELEASED tokens for app attempt so that RM can be restarted while preserving current applications. (Jian He via vinodkv) + YARN-568. Add support for work preserving preemption to the FairScheduler. + (Carlo Curino and Sandy Ryza via cdouglas) + YARN-598. Add virtual cores to queue metrics. (sandyr via tucu) OPTIMIZATIONS diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ApplicationMasterService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ApplicationMasterService.java index 02bb458663b..3094a93ff08 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ApplicationMasterService.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ApplicationMasterService.java @@ -21,7 +21,9 @@ package org.apache.hadoop.yarn.server.resourcemanager; import java.io.IOException; import java.net.InetSocketAddress; import java.util.ArrayList; +import java.util.HashSet; import java.util.List; +import java.util.Set; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; @@ -39,6 +41,11 @@ import org.apache.hadoop.yarn.api.protocolrecords.AllocateRequest; import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse; import org.apache.hadoop.yarn.api.protocolrecords.FinishApplicationMasterRequest; import org.apache.hadoop.yarn.api.protocolrecords.FinishApplicationMasterResponse; +import org.apache.hadoop.yarn.api.protocolrecords.PreemptionContainer; +import org.apache.hadoop.yarn.api.protocolrecords.PreemptionContract; +import org.apache.hadoop.yarn.api.protocolrecords.PreemptionResourceRequest; +import org.apache.hadoop.yarn.api.protocolrecords.StrictPreemptionContract; +import org.apache.hadoop.yarn.api.protocolrecords.PreemptionMessage; import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterRequest; import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterResponse; import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; @@ -341,9 +348,65 @@ public class ApplicationMasterService extends AbstractService implements } allocateResponse.setNumClusterNodes(this.rScheduler.getNumClusterNodes()); + + // add preemption to the allocateResponse message (if any) + allocateResponse.setPreemptionMessage(generatePreemptionMessage(allocation)); + return allocateResponse; } } + + private PreemptionMessage generatePreemptionMessage(Allocation allocation){ + PreemptionMessage pMsg = null; + // assemble strict preemption request + if (allocation.getStrictContainerPreemptions() != null) { + pMsg = + recordFactory.newRecordInstance(PreemptionMessage.class); + StrictPreemptionContract pStrict = + recordFactory.newRecordInstance(StrictPreemptionContract.class); + Set pCont = new HashSet(); + for (ContainerId cId : allocation.getStrictContainerPreemptions()) { + PreemptionContainer pc = + recordFactory.newRecordInstance(PreemptionContainer.class); + pc.setId(cId); + pCont.add(pc); + } + pStrict.setContainers(pCont); + pMsg.setStrictContract(pStrict); + } + + // assemble negotiable preemption request + if (allocation.getResourcePreemptions() != null && + allocation.getResourcePreemptions().size() > 0 && + allocation.getContainerPreemptions() != null && + allocation.getContainerPreemptions().size() > 0) { + if (pMsg == null) { + pMsg = + recordFactory.newRecordInstance(PreemptionMessage.class); + } + PreemptionContract contract = + recordFactory.newRecordInstance(PreemptionContract.class); + Set pCont = new HashSet(); + for (ContainerId cId : allocation.getContainerPreemptions()) { + PreemptionContainer pc = + recordFactory.newRecordInstance(PreemptionContainer.class); + pc.setId(cId); + pCont.add(pc); + } + List pRes = new ArrayList(); + for (ResourceRequest crr : allocation.getResourcePreemptions()) { + PreemptionResourceRequest prr = + recordFactory.newRecordInstance(PreemptionResourceRequest.class); + prr.setResourceRequest(crr); + pRes.add(prr); + } + contract.setContainers(pCont); + contract.setResourceRequest(pRes); + pMsg.setContract(contract); + } + + return pMsg; + } public void registerAppAttempt(ApplicationAttemptId attemptId) { AllocateResponse response = diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/Allocation.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/Allocation.java index cfef9be5054..89055048608 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/Allocation.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/Allocation.java @@ -19,17 +19,43 @@ package org.apache.hadoop.yarn.server.resourcemanager.scheduler; import java.util.List; +import java.util.Set; import org.apache.hadoop.yarn.api.records.Container; +import org.apache.hadoop.yarn.api.records.ContainerId; import org.apache.hadoop.yarn.api.records.Resource; +import org.apache.hadoop.yarn.api.records.ResourceRequest; +import org.apache.hadoop.yarn.factories.RecordFactory; +import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider; public class Allocation { + + private final RecordFactory recordFactory = + RecordFactoryProvider.getRecordFactory(null); + final List containers; final Resource resourceLimit; - + final Set strictContainers; + final Set fungibleContainers; + final List fungibleResources; + public Allocation(List containers, Resource resourceLimit) { + this(containers, resourceLimit, null, null, null); + } + + public Allocation(List containers, Resource resourceLimit, + Set strictContainers) { + this(containers, resourceLimit, strictContainers, null, null); + } + + public Allocation(List containers, Resource resourceLimit, + Set strictContainers, Set fungibleContainers, + List fungibleResources) { this.containers = containers; this.resourceLimit = resourceLimit; + this.strictContainers = strictContainers; + this.fungibleContainers = fungibleContainers; + this.fungibleResources = fungibleResources; } public List getContainers() { @@ -39,5 +65,17 @@ public class Allocation { public Resource getResourceLimit() { return resourceLimit; } - + + public Set getStrictContainerPreemptions() { + return strictContainers; + } + + public Set getContainerPreemptions() { + return fungibleContainers; + } + + public List getResourcePreemptions() { + return fungibleResources; + } + } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSSchedulerApp.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSSchedulerApp.java index 426b2335d54..9db15a6bc0b 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSSchedulerApp.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSSchedulerApp.java @@ -23,6 +23,7 @@ import java.util.Collection; import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.Set; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -86,7 +87,9 @@ public class FSSchedulerApp extends SchedulerApplication { final Map> reservedContainers = new HashMap>(); - + + final Map preemptionMap = new HashMap(); + /** * Count how many times the application has been given an opportunity * to schedule a task at each priority. Each time the scheduler @@ -233,6 +236,9 @@ public class FSSchedulerApp extends SchedulerApplication { Resource containerResource = rmContainer.getContainer().getResource(); queue.getMetrics().releaseResources(getUser(), 1, containerResource); Resources.subtractFrom(currentConsumption, containerResource); + + // remove from preemption map if it is completed + preemptionMap.remove(rmContainer); } synchronized public List pullNewlyAllocatedContainers() { @@ -574,4 +580,18 @@ public class FSSchedulerApp extends SchedulerApplication { " priority " + priority); allowedLocalityLevel.put(priority, level); } + + // related methods + public void addPreemption(RMContainer container, long time) { + assert preemptionMap.get(container) == null; + preemptionMap.put(container, time); + } + + public Long getContainerPreemptionTime(RMContainer container) { + return preemptionMap.get(container); + } + + public Set getPreemptionContainers() { + return preemptionMap.keySet(); + } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java index c471cb84436..5904ae8b22d 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java @@ -24,8 +24,11 @@ import java.util.Collection; import java.util.Collections; import java.util.Comparator; import java.util.HashMap; +import java.util.HashSet; +import java.util.Iterator; import java.util.List; import java.util.Map; +import java.util.Set; import java.util.concurrent.ConcurrentHashMap; import org.apache.commons.logging.Log; @@ -155,10 +158,16 @@ public class FairScheduler implements ResourceScheduler { private Resource clusterCapacity = RecordFactoryProvider.getRecordFactory(null).newRecordInstance(Resource.class); - // How often tasks are preempted (must be longer than a couple + // How often tasks are preempted + protected long preemptionInterval; + + // ms to wait before force killing stuff (must be longer than a couple // of heartbeats to give task-kill commands a chance to act). - protected long preemptionInterval = 15000; - + protected long waitTimeBeforeKill; + + // Containers whose AMs have been warned that they will be preempted soon. + private List warnedContainers = new ArrayList(); + protected boolean preemptionEnabled; protected boolean sizeBasedWeight; // Give larger weights to larger jobs protected WeightAdjuster weightAdjuster; // Can be null for no weight adjuster @@ -331,34 +340,78 @@ public class FairScheduler implements ResourceScheduler { // Sort containers into reverse order of priority Collections.sort(runningContainers, new Comparator() { public int compare(RMContainer c1, RMContainer c2) { - return c2.getContainer().getPriority().compareTo( + int ret = c2.getContainer().getPriority().compareTo( c1.getContainer().getPriority()); + if (ret == 0) { + return c2.getContainerId().compareTo(c1.getContainerId()); + } + return ret; } }); + + // Scan down the list of containers we've already warned and kill them + // if we need to. Remove any containers from the list that we don't need + // or that are no longer running. + Iterator warnedIter = warnedContainers.iterator(); + Set preemptedThisRound = new HashSet(); + while (warnedIter.hasNext()) { + RMContainer container = warnedIter.next(); + if (container.getState() == RMContainerState.RUNNING && + Resources.greaterThan(RESOURCE_CALCULATOR, clusterCapacity, + toPreempt, Resources.none())) { + warnOrKillContainer(container, apps.get(container), queues.get(container)); + preemptedThisRound.add(container); + Resources.subtractFrom(toPreempt, container.getContainer().getResource()); + } else { + warnedIter.remove(); + } + } - // Scan down the sorted list of task statuses until we've killed enough - // tasks, making sure we don't kill too many from any queue - for (RMContainer container : runningContainers) { + // Scan down the rest of the containers until we've preempted enough, making + // sure we don't preempt too many from any queue + Iterator runningIter = runningContainers.iterator(); + while (runningIter.hasNext() && + Resources.greaterThan(RESOURCE_CALCULATOR, clusterCapacity, + toPreempt, Resources.none())) { + RMContainer container = runningIter.next(); FSLeafQueue sched = queues.get(container); - if (Resources.greaterThan(RESOURCE_CALCULATOR, clusterCapacity, - sched.getResourceUsage(), sched.getFairShare())) { - LOG.info("Preempting container (prio=" + container.getContainer().getPriority() + - "res=" + container.getContainer().getResource() + - ") from queue " + sched.getName()); - ContainerStatus status = SchedulerUtils.createAbnormalContainerStatus( + if (!preemptedThisRound.contains(container) && + Resources.greaterThan(RESOURCE_CALCULATOR, clusterCapacity, + sched.getResourceUsage(), sched.getFairShare())) { + warnOrKillContainer(container, apps.get(container), sched); + + warnedContainers.add(container); + Resources.subtractFrom(toPreempt, container.getContainer().getResource()); + } + } + } + + private void warnOrKillContainer(RMContainer container, FSSchedulerApp app, + FSLeafQueue queue) { + LOG.info("Preempting container (prio=" + container.getContainer().getPriority() + + "res=" + container.getContainer().getResource() + + ") from queue " + queue.getName()); + + Long time = app.getContainerPreemptionTime(container); + + if (time != null) { + // if we asked for preemption more than maxWaitTimeBeforeKill ms ago, + // proceed with kill + if (time + waitTimeBeforeKill < clock.getTime()) { + ContainerStatus status = + SchedulerUtils.createAbnormalContainerStatus( container.getContainerId(), SchedulerUtils.PREEMPTED_CONTAINER); // TODO: Not sure if this ever actually adds this to the list of cleanup // containers on the RMNode (see SchedulerNode.releaseContainer()). completedContainer(container, status, RMContainerEventType.KILL); - - toPreempt = Resources.subtract(toPreempt, - container.getContainer().getResource()); - if (Resources.lessThanOrEqual(RESOURCE_CALCULATOR, clusterCapacity, - toPreempt, Resources.none())) { - break; - } + LOG.info("Killing container" + container + + " (after waiting for premption for " + + (clock.getTime() - time) + "ms)"); } + } else { + // track the request in the FSSchedulerApp itself + app.addPreemption(container, clock.getTime()); } } @@ -483,11 +536,11 @@ public class FairScheduler implements ResourceScheduler { return clusterCapacity; } - public Clock getClock() { + public synchronized Clock getClock() { return clock; } - protected void setClock(Clock clock) { + protected synchronized void setClock(Clock clock) { this.clock = clock; } @@ -745,10 +798,18 @@ public class FairScheduler implements ResourceScheduler { LOG.debug("allocate:" + " applicationAttemptId=" + appAttemptId + " #ask=" + ask.size()); - } + LOG.debug("Preempting " + application.getPreemptionContainers().size() + + " container(s)"); + } + + Set preemptionContainerIds = new HashSet(); + for (RMContainer container : application.getPreemptionContainers()) { + preemptionContainerIds.add(container.getContainerId()); + } + return new Allocation(application.pullNewlyAllocatedContainers(), - application.getHeadroom()); + application.getHeadroom(), preemptionContainerIds); } } @@ -963,7 +1024,9 @@ public class FairScheduler implements ResourceScheduler { assignMultiple = this.conf.getAssignMultiple(); maxAssign = this.conf.getMaxAssign(); sizeBasedWeight = this.conf.getSizeBasedWeight(); - + preemptionInterval = this.conf.getPreemptionInterval(); + waitTimeBeforeKill = this.conf.getWaitTimeBeforeKill(); + if (!initialized) { rootMetrics = QueueMetrics.forQueue("root", null, true, conf); this.rmContext = rmContext; diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairSchedulerConfiguration.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairSchedulerConfiguration.java index e2dd054f9d7..05307cc31b5 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairSchedulerConfiguration.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairSchedulerConfiguration.java @@ -55,6 +55,11 @@ public class FairSchedulerConfiguration extends Configuration { /** Whether preemption is enabled. */ protected static final String PREEMPTION = CONF_PREFIX + "preemption"; protected static final boolean DEFAULT_PREEMPTION = false; + + protected static final String PREEMPTION_INTERVAL = CONF_PREFIX + "preemptionInterval"; + protected static final int DEFAULT_PREEMPTION_INTERVAL = 5000; + protected static final String WAIT_TIME_BEFORE_KILL = CONF_PREFIX + "waitTimeBeforeKill"; + protected static final int DEFAULT_WAIT_TIME_BEFORE_KILL = 15000; /** Whether to assign multiple containers in one check-in. */ protected static final String ASSIGN_MULTIPLE = CONF_PREFIX + "assignmultiple"; @@ -123,4 +128,12 @@ public class FairSchedulerConfiguration extends Configuration { return get(EVENT_LOG_DIR, new File(System.getProperty("hadoop.log.dir", "/tmp/")).getAbsolutePath() + File.separator + "fairscheduler"); } + + public int getPreemptionInterval() { + return getInt(PREEMPTION_INTERVAL, DEFAULT_PREEMPTION_INTERVAL); + } + + public int getWaitTimeBeforeKill() { + return getInt(WAIT_TIME_BEFORE_KILL, DEFAULT_WAIT_TIME_BEFORE_KILL); + } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairScheduler.java index f26e8234c79..e83e58b4bd3 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairScheduler.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairScheduler.java @@ -30,6 +30,7 @@ import java.io.PrintWriter; import java.util.ArrayList; import java.util.Arrays; import java.util.Collection; +import java.util.Collections; import java.util.List; import java.util.Map; @@ -900,9 +901,16 @@ public class TestFairScheduler { */ public void testChoiceOfPreemptedContainers() throws Exception { Configuration conf = createConfiguration(); + + conf.setLong(FairSchedulerConfiguration.PREEMPTION_INTERVAL, 5000); + conf.setLong(FairSchedulerConfiguration.WAIT_TIME_BEFORE_KILL, 10000); + conf.set(FairSchedulerConfiguration.ALLOCATION_FILE + ".allocation.file", ALLOC_FILE); scheduler.reinitialize(conf, resourceManager.getRMContext()); - + + MockClock clock = new MockClock(); + scheduler.setClock(clock); + PrintWriter out = new PrintWriter(new FileWriter(ALLOC_FILE)); out.println(""); out.println(""); @@ -997,15 +1005,38 @@ public class TestFairScheduler { Resources.createResource(2 * 1024)); assertEquals(1, scheduler.applications.get(app1).getLiveContainers().size()); assertEquals(1, scheduler.applications.get(app2).getLiveContainers().size()); - assertEquals(0, scheduler.applications.get(app3).getLiveContainers().size()); assertEquals(1, scheduler.applications.get(app4).getLiveContainers().size()); assertEquals(1, scheduler.applications.get(app5).getLiveContainers().size()); + + // First verify we are adding containers to preemption list for the application + assertTrue(!Collections.disjoint(scheduler.applications.get(app3).getLiveContainers(), + scheduler.applications.get(app3).getPreemptionContainers())); + assertTrue(!Collections.disjoint(scheduler.applications.get(app6).getLiveContainers(), + scheduler.applications.get(app6).getPreemptionContainers())); + + // Pretend 15 seconds have passed + clock.tick(15); + + // Trigger a kill by insisting we want containers back + scheduler.preemptResources(scheduler.getQueueManager().getLeafQueues(), + Resources.createResource(2 * 1024)); + + // At this point the containers should have been killed (since we are not simulating AM) assertEquals(0, scheduler.applications.get(app6).getLiveContainers().size()); + assertEquals(0, scheduler.applications.get(app3).getLiveContainers().size()); + + // Trigger a kill by insisting we want containers back + scheduler.preemptResources(scheduler.getQueueManager().getLeafQueues(), + Resources.createResource(2 * 1024)); + + // Pretend 15 seconds have passed + clock.tick(15); // We should be able to claw back another container from A and B each. // Make sure it is lowest priority container. scheduler.preemptResources(scheduler.getQueueManager().getLeafQueues(), Resources.createResource(2 * 1024)); + assertEquals(1, scheduler.applications.get(app1).getLiveContainers().size()); assertEquals(0, scheduler.applications.get(app2).getLiveContainers().size()); assertEquals(0, scheduler.applications.get(app3).getLiveContainers().size());