YARN-568. Add support for work preserving preemption to the FairScheduler.

Contributed by Carlo Curino and Sandy Ryza


git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1480778 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Christopher Douglas 2013-05-09 21:21:42 +00:00
parent d303f01297
commit 505fe26539
7 changed files with 264 additions and 33 deletions

View File

@ -10,9 +10,6 @@ Trunk - Unreleased
Azure environments. (See breakdown of tasks below for subtasks and
contributors)
YARN-45. Add protocol for schedulers to request containers back from
ApplicationMasters. (Carlo Curino, cdouglas)
IMPROVEMENTS
YARN-84. Use Builder to build RPC server. (Brandon Li via suresh)
@ -129,6 +126,9 @@ Release 2.0.5-beta - UNRELEASED
YARN-482. FS: Extend SchedulingMode to intermediate queues.
(kkambatl via tucu)
YARN-45. Add protocol for schedulers to request containers back from
ApplicationMasters. (Carlo Curino, cdouglas)
IMPROVEMENTS
YARN-365. Change NM heartbeat handling to not generate a scheduler event
@ -235,6 +235,9 @@ Release 2.0.5-beta - UNRELEASED
tokens for app attempt so that RM can be restarted while preserving current
applications. (Jian He via vinodkv)
YARN-568. Add support for work preserving preemption to the FairScheduler.
(Carlo Curino and Sandy Ryza via cdouglas)
OPTIMIZATIONS
BUG FIXES

View File

@ -21,7 +21,9 @@ package org.apache.hadoop.yarn.server.resourcemanager;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
@ -39,6 +41,11 @@ import org.apache.hadoop.yarn.api.protocolrecords.AllocateRequest;
import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse;
import org.apache.hadoop.yarn.api.protocolrecords.FinishApplicationMasterRequest;
import org.apache.hadoop.yarn.api.protocolrecords.FinishApplicationMasterResponse;
import org.apache.hadoop.yarn.api.protocolrecords.PreemptionContainer;
import org.apache.hadoop.yarn.api.protocolrecords.PreemptionContract;
import org.apache.hadoop.yarn.api.protocolrecords.PreemptionResourceRequest;
import org.apache.hadoop.yarn.api.protocolrecords.StrictPreemptionContract;
import org.apache.hadoop.yarn.api.protocolrecords.PreemptionMessage;
import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterRequest;
import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterResponse;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
@ -341,10 +348,66 @@ public class ApplicationMasterService extends AbstractService implements
}
allocateResponse.setNumClusterNodes(this.rScheduler.getNumClusterNodes());
// add preemption to the allocateResponse message (if any)
allocateResponse.setPreemptionMessage(generatePreemptionMessage(allocation));
return allocateResponse;
}
}
private PreemptionMessage generatePreemptionMessage(Allocation allocation){
PreemptionMessage pMsg = null;
// assemble strict preemption request
if (allocation.getStrictContainerPreemptions() != null) {
pMsg =
recordFactory.newRecordInstance(PreemptionMessage.class);
StrictPreemptionContract pStrict =
recordFactory.newRecordInstance(StrictPreemptionContract.class);
Set<PreemptionContainer> pCont = new HashSet<PreemptionContainer>();
for (ContainerId cId : allocation.getStrictContainerPreemptions()) {
PreemptionContainer pc =
recordFactory.newRecordInstance(PreemptionContainer.class);
pc.setId(cId);
pCont.add(pc);
}
pStrict.setContainers(pCont);
pMsg.setStrictContract(pStrict);
}
// assemble negotiable preemption request
if (allocation.getResourcePreemptions() != null &&
allocation.getResourcePreemptions().size() > 0 &&
allocation.getContainerPreemptions() != null &&
allocation.getContainerPreemptions().size() > 0) {
if (pMsg == null) {
pMsg =
recordFactory.newRecordInstance(PreemptionMessage.class);
}
PreemptionContract contract =
recordFactory.newRecordInstance(PreemptionContract.class);
Set<PreemptionContainer> pCont = new HashSet<PreemptionContainer>();
for (ContainerId cId : allocation.getContainerPreemptions()) {
PreemptionContainer pc =
recordFactory.newRecordInstance(PreemptionContainer.class);
pc.setId(cId);
pCont.add(pc);
}
List<PreemptionResourceRequest> pRes = new ArrayList<PreemptionResourceRequest>();
for (ResourceRequest crr : allocation.getResourcePreemptions()) {
PreemptionResourceRequest prr =
recordFactory.newRecordInstance(PreemptionResourceRequest.class);
prr.setResourceRequest(crr);
pRes.add(prr);
}
contract.setContainers(pCont);
contract.setResourceRequest(pRes);
pMsg.setContract(contract);
}
return pMsg;
}
public void registerAppAttempt(ApplicationAttemptId attemptId) {
AllocateResponse response =
recordFactory.newRecordInstance(AllocateResponse.class);

View File

@ -19,17 +19,43 @@
package org.apache.hadoop.yarn.server.resourcemanager.scheduler;
import java.util.List;
import java.util.Set;
import org.apache.hadoop.yarn.api.records.Container;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.api.records.ResourceRequest;
import org.apache.hadoop.yarn.factories.RecordFactory;
import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
public class Allocation {
private final RecordFactory recordFactory =
RecordFactoryProvider.getRecordFactory(null);
final List<Container> containers;
final Resource resourceLimit;
final Set<ContainerId> strictContainers;
final Set<ContainerId> fungibleContainers;
final List<ResourceRequest> fungibleResources;
public Allocation(List<Container> containers, Resource resourceLimit) {
this(containers, resourceLimit, null, null, null);
}
public Allocation(List<Container> containers, Resource resourceLimit,
Set<ContainerId> strictContainers) {
this(containers, resourceLimit, strictContainers, null, null);
}
public Allocation(List<Container> containers, Resource resourceLimit,
Set<ContainerId> strictContainers, Set<ContainerId> fungibleContainers,
List<ResourceRequest> fungibleResources) {
this.containers = containers;
this.resourceLimit = resourceLimit;
this.strictContainers = strictContainers;
this.fungibleContainers = fungibleContainers;
this.fungibleResources = fungibleResources;
}
public List<Container> getContainers() {
@ -40,4 +66,16 @@ public class Allocation {
return resourceLimit;
}
public Set<ContainerId> getStrictContainerPreemptions() {
return strictContainers;
}
public Set<ContainerId> getContainerPreemptions() {
return fungibleContainers;
}
public List<ResourceRequest> getResourcePreemptions() {
return fungibleResources;
}
}

View File

@ -23,6 +23,7 @@ import java.util.Collection;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
@ -84,6 +85,8 @@ public class FSSchedulerApp extends SchedulerApplication {
final Map<Priority, Map<NodeId, RMContainer>> reservedContainers =
new HashMap<Priority, Map<NodeId, RMContainer>>();
final Map<RMContainer, Long> preemptionMap = new HashMap<RMContainer, Long>();
/**
* Count how many times the application has been given an opportunity
* to schedule a task at each priority. Each time the scheduler
@ -230,6 +233,9 @@ public class FSSchedulerApp extends SchedulerApplication {
Resource containerResource = rmContainer.getContainer().getResource();
queue.getMetrics().releaseResources(getUser(), 1, containerResource);
Resources.subtractFrom(currentConsumption, containerResource);
// remove from preemption map if it is completed
preemptionMap.remove(rmContainer);
}
synchronized public List<Container> pullNewlyAllocatedContainers() {
@ -572,4 +578,18 @@ public class FSSchedulerApp extends SchedulerApplication {
" priority " + priority);
allowedLocalityLevel.put(priority, level);
}
// related methods
public void addPreemption(RMContainer container, long time) {
assert preemptionMap.get(container) == null;
preemptionMap.put(container, time);
}
public Long getContainerPreemptionTime(RMContainer container) {
return preemptionMap.get(container);
}
public Set<RMContainer> getPreemptionContainers() {
return preemptionMap.keySet();
}
}

View File

@ -24,8 +24,11 @@ import java.util.Collection;
import java.util.Collections;
import java.util.Comparator;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import org.apache.commons.logging.Log;
@ -155,9 +158,15 @@ public class FairScheduler implements ResourceScheduler {
private Resource clusterCapacity =
RecordFactoryProvider.getRecordFactory(null).newRecordInstance(Resource.class);
// How often tasks are preempted (must be longer than a couple
// How often tasks are preempted
protected long preemptionInterval;
// ms to wait before force killing stuff (must be longer than a couple
// of heartbeats to give task-kill commands a chance to act).
protected long preemptionInterval = 15000;
protected long waitTimeBeforeKill;
// Containers whose AMs have been warned that they will be preempted soon.
private List<RMContainer> warnedContainers = new ArrayList<RMContainer>();
protected boolean preemptionEnabled;
protected boolean sizeBasedWeight; // Give larger weights to larger jobs
@ -335,34 +344,78 @@ public class FairScheduler implements ResourceScheduler {
// Sort containers into reverse order of priority
Collections.sort(runningContainers, new Comparator<RMContainer>() {
public int compare(RMContainer c1, RMContainer c2) {
return c2.getContainer().getPriority().compareTo(
int ret = c2.getContainer().getPriority().compareTo(
c1.getContainer().getPriority());
if (ret == 0) {
return c2.getContainerId().compareTo(c1.getContainerId());
}
return ret;
}
});
// Scan down the sorted list of task statuses until we've killed enough
// tasks, making sure we don't kill too many from any queue
for (RMContainer container : runningContainers) {
// Scan down the list of containers we've already warned and kill them
// if we need to. Remove any containers from the list that we don't need
// or that are no longer running.
Iterator<RMContainer> warnedIter = warnedContainers.iterator();
Set<RMContainer> preemptedThisRound = new HashSet<RMContainer>();
while (warnedIter.hasNext()) {
RMContainer container = warnedIter.next();
if (container.getState() == RMContainerState.RUNNING &&
Resources.greaterThan(RESOURCE_CALCULATOR, clusterCapacity,
toPreempt, Resources.none())) {
warnOrKillContainer(container, apps.get(container), queues.get(container));
preemptedThisRound.add(container);
Resources.subtractFrom(toPreempt, container.getContainer().getResource());
} else {
warnedIter.remove();
}
}
// Scan down the rest of the containers until we've preempted enough, making
// sure we don't preempt too many from any queue
Iterator<RMContainer> runningIter = runningContainers.iterator();
while (runningIter.hasNext() &&
Resources.greaterThan(RESOURCE_CALCULATOR, clusterCapacity,
toPreempt, Resources.none())) {
RMContainer container = runningIter.next();
FSLeafQueue sched = queues.get(container);
if (Resources.greaterThan(RESOURCE_CALCULATOR, clusterCapacity,
if (!preemptedThisRound.contains(container) &&
Resources.greaterThan(RESOURCE_CALCULATOR, clusterCapacity,
sched.getResourceUsage(), sched.getFairShare())) {
warnOrKillContainer(container, apps.get(container), sched);
warnedContainers.add(container);
Resources.subtractFrom(toPreempt, container.getContainer().getResource());
}
}
}
private void warnOrKillContainer(RMContainer container, FSSchedulerApp app,
FSLeafQueue queue) {
LOG.info("Preempting container (prio=" + container.getContainer().getPriority() +
"res=" + container.getContainer().getResource() +
") from queue " + sched.getName());
ContainerStatus status = SchedulerUtils.createAbnormalContainerStatus(
") from queue " + queue.getName());
Long time = app.getContainerPreemptionTime(container);
if (time != null) {
// if we asked for preemption more than maxWaitTimeBeforeKill ms ago,
// proceed with kill
if (time + waitTimeBeforeKill < clock.getTime()) {
ContainerStatus status =
SchedulerUtils.createAbnormalContainerStatus(
container.getContainerId(), SchedulerUtils.PREEMPTED_CONTAINER);
// TODO: Not sure if this ever actually adds this to the list of cleanup
// containers on the RMNode (see SchedulerNode.releaseContainer()).
completedContainer(container, status, RMContainerEventType.KILL);
toPreempt = Resources.subtract(toPreempt,
container.getContainer().getResource());
if (Resources.lessThanOrEqual(RESOURCE_CALCULATOR, clusterCapacity,
toPreempt, Resources.none())) {
break;
}
LOG.info("Killing container" + container +
" (after waiting for premption for " +
(clock.getTime() - time) + "ms)");
}
} else {
// track the request in the FSSchedulerApp itself
app.addPreemption(container, clock.getTime());
}
}
@ -487,11 +540,11 @@ public class FairScheduler implements ResourceScheduler {
return clusterCapacity;
}
public Clock getClock() {
public synchronized Clock getClock() {
return clock;
}
protected void setClock(Clock clock) {
protected synchronized void setClock(Clock clock) {
this.clock = clock;
}
@ -746,10 +799,18 @@ public class FairScheduler implements ResourceScheduler {
LOG.debug("allocate:" +
" applicationAttemptId=" + appAttemptId +
" #ask=" + ask.size());
LOG.debug("Preempting " + application.getPreemptionContainers().size()
+ " container(s)");
}
Set<ContainerId> preemptionContainerIds = new HashSet<ContainerId>();
for (RMContainer container : application.getPreemptionContainers()) {
preemptionContainerIds.add(container.getContainerId());
}
return new Allocation(application.pullNewlyAllocatedContainers(),
application.getHeadroom());
application.getHeadroom(), preemptionContainerIds);
}
}
@ -950,6 +1011,8 @@ public class FairScheduler implements ResourceScheduler {
assignMultiple = this.conf.getAssignMultiple();
maxAssign = this.conf.getMaxAssign();
sizeBasedWeight = this.conf.getSizeBasedWeight();
preemptionInterval = this.conf.getPreemptionInterval();
waitTimeBeforeKill = this.conf.getWaitTimeBeforeKill();
if (!initialized) {
rootMetrics = QueueMetrics.forQueue("root", null, true, conf);

View File

@ -53,6 +53,11 @@ public class FairSchedulerConfiguration extends Configuration {
protected static final String PREEMPTION = CONF_PREFIX + "preemption";
protected static final boolean DEFAULT_PREEMPTION = false;
protected static final String PREEMPTION_INTERVAL = CONF_PREFIX + "preemptionInterval";
protected static final int DEFAULT_PREEMPTION_INTERVAL = 5000;
protected static final String WAIT_TIME_BEFORE_KILL = CONF_PREFIX + "waitTimeBeforeKill";
protected static final int DEFAULT_WAIT_TIME_BEFORE_KILL = 15000;
/** Whether to assign multiple containers in one check-in. */
protected static final String ASSIGN_MULTIPLE = CONF_PREFIX + "assignmultiple";
protected static final boolean DEFAULT_ASSIGN_MULTIPLE = false;
@ -120,4 +125,12 @@ public class FairSchedulerConfiguration extends Configuration {
return get(EVENT_LOG_DIR, new File(System.getProperty("hadoop.log.dir",
"/tmp/")).getAbsolutePath() + File.separator + "fairscheduler");
}
public int getPreemptionInterval() {
return getInt(PREEMPTION_INTERVAL, DEFAULT_PREEMPTION_INTERVAL);
}
public int getWaitTimeBeforeKill() {
return getInt(WAIT_TIME_BEFORE_KILL, DEFAULT_WAIT_TIME_BEFORE_KILL);
}
}

View File

@ -30,6 +30,7 @@ import java.io.PrintWriter;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.Map;
@ -891,9 +892,16 @@ public class TestFairScheduler {
*/
public void testChoiceOfPreemptedContainers() throws Exception {
Configuration conf = createConfiguration();
conf.setLong(FairSchedulerConfiguration.PREEMPTION_INTERVAL, 5000);
conf.setLong(FairSchedulerConfiguration.WAIT_TIME_BEFORE_KILL, 10000);
conf.set(FairSchedulerConfiguration.ALLOCATION_FILE + ".allocation.file", ALLOC_FILE);
scheduler.reinitialize(conf, resourceManager.getRMContext());
MockClock clock = new MockClock();
scheduler.setClock(clock);
PrintWriter out = new PrintWriter(new FileWriter(ALLOC_FILE));
out.println("<?xml version=\"1.0\"?>");
out.println("<allocations>");
@ -988,15 +996,38 @@ public class TestFairScheduler {
Resources.createResource(2 * 1024));
assertEquals(1, scheduler.applications.get(app1).getLiveContainers().size());
assertEquals(1, scheduler.applications.get(app2).getLiveContainers().size());
assertEquals(0, scheduler.applications.get(app3).getLiveContainers().size());
assertEquals(1, scheduler.applications.get(app4).getLiveContainers().size());
assertEquals(1, scheduler.applications.get(app5).getLiveContainers().size());
// First verify we are adding containers to preemption list for the application
assertTrue(!Collections.disjoint(scheduler.applications.get(app3).getLiveContainers(),
scheduler.applications.get(app3).getPreemptionContainers()));
assertTrue(!Collections.disjoint(scheduler.applications.get(app6).getLiveContainers(),
scheduler.applications.get(app6).getPreemptionContainers()));
// Pretend 15 seconds have passed
clock.tick(15);
// Trigger a kill by insisting we want containers back
scheduler.preemptResources(scheduler.getQueueManager().getLeafQueues(),
Resources.createResource(2 * 1024));
// At this point the containers should have been killed (since we are not simulating AM)
assertEquals(0, scheduler.applications.get(app6).getLiveContainers().size());
assertEquals(0, scheduler.applications.get(app3).getLiveContainers().size());
// Trigger a kill by insisting we want containers back
scheduler.preemptResources(scheduler.getQueueManager().getLeafQueues(),
Resources.createResource(2 * 1024));
// Pretend 15 seconds have passed
clock.tick(15);
// We should be able to claw back another container from A and B each.
// Make sure it is lowest priority container.
scheduler.preemptResources(scheduler.getQueueManager().getLeafQueues(),
Resources.createResource(2 * 1024));
assertEquals(1, scheduler.applications.get(app1).getLiveContainers().size());
assertEquals(0, scheduler.applications.get(app2).getLiveContainers().size());
assertEquals(0, scheduler.applications.get(app3).getLiveContainers().size());