YARN-568. Add support for work preserving preemption to the FairScheduler.

Contributed by Carlo Curino and Sandy Ryza


git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/branch-2@1480837 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Christopher Douglas 2013-05-09 23:29:38 +00:00
parent 98f63813b4
commit a88f23aec3
7 changed files with 261 additions and 30 deletions

View File

@ -160,6 +160,9 @@ Release 2.0.5-beta - UNRELEASED
tokens for app attempt so that RM can be restarted while preserving current tokens for app attempt so that RM can be restarted while preserving current
applications. (Jian He via vinodkv) applications. (Jian He via vinodkv)
YARN-568. Add support for work preserving preemption to the FairScheduler.
(Carlo Curino and Sandy Ryza via cdouglas)
YARN-598. Add virtual cores to queue metrics. (sandyr via tucu) YARN-598. Add virtual cores to queue metrics. (sandyr via tucu)
OPTIMIZATIONS OPTIMIZATIONS

View File

@ -21,7 +21,9 @@ package org.apache.hadoop.yarn.server.resourcemanager;
import java.io.IOException; import java.io.IOException;
import java.net.InetSocketAddress; import java.net.InetSocketAddress;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.HashSet;
import java.util.List; import java.util.List;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap; import java.util.concurrent.ConcurrentMap;
@ -39,6 +41,11 @@ import org.apache.hadoop.yarn.api.protocolrecords.AllocateRequest;
import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse; import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse;
import org.apache.hadoop.yarn.api.protocolrecords.FinishApplicationMasterRequest; import org.apache.hadoop.yarn.api.protocolrecords.FinishApplicationMasterRequest;
import org.apache.hadoop.yarn.api.protocolrecords.FinishApplicationMasterResponse; import org.apache.hadoop.yarn.api.protocolrecords.FinishApplicationMasterResponse;
import org.apache.hadoop.yarn.api.protocolrecords.PreemptionContainer;
import org.apache.hadoop.yarn.api.protocolrecords.PreemptionContract;
import org.apache.hadoop.yarn.api.protocolrecords.PreemptionResourceRequest;
import org.apache.hadoop.yarn.api.protocolrecords.StrictPreemptionContract;
import org.apache.hadoop.yarn.api.protocolrecords.PreemptionMessage;
import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterRequest; import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterRequest;
import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterResponse; import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterResponse;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
@ -341,9 +348,65 @@ public class ApplicationMasterService extends AbstractService implements
} }
allocateResponse.setNumClusterNodes(this.rScheduler.getNumClusterNodes()); allocateResponse.setNumClusterNodes(this.rScheduler.getNumClusterNodes());
// add preemption to the allocateResponse message (if any)
allocateResponse.setPreemptionMessage(generatePreemptionMessage(allocation));
return allocateResponse; return allocateResponse;
} }
} }
private PreemptionMessage generatePreemptionMessage(Allocation allocation){
PreemptionMessage pMsg = null;
// assemble strict preemption request
if (allocation.getStrictContainerPreemptions() != null) {
pMsg =
recordFactory.newRecordInstance(PreemptionMessage.class);
StrictPreemptionContract pStrict =
recordFactory.newRecordInstance(StrictPreemptionContract.class);
Set<PreemptionContainer> pCont = new HashSet<PreemptionContainer>();
for (ContainerId cId : allocation.getStrictContainerPreemptions()) {
PreemptionContainer pc =
recordFactory.newRecordInstance(PreemptionContainer.class);
pc.setId(cId);
pCont.add(pc);
}
pStrict.setContainers(pCont);
pMsg.setStrictContract(pStrict);
}
// assemble negotiable preemption request
if (allocation.getResourcePreemptions() != null &&
allocation.getResourcePreemptions().size() > 0 &&
allocation.getContainerPreemptions() != null &&
allocation.getContainerPreemptions().size() > 0) {
if (pMsg == null) {
pMsg =
recordFactory.newRecordInstance(PreemptionMessage.class);
}
PreemptionContract contract =
recordFactory.newRecordInstance(PreemptionContract.class);
Set<PreemptionContainer> pCont = new HashSet<PreemptionContainer>();
for (ContainerId cId : allocation.getContainerPreemptions()) {
PreemptionContainer pc =
recordFactory.newRecordInstance(PreemptionContainer.class);
pc.setId(cId);
pCont.add(pc);
}
List<PreemptionResourceRequest> pRes = new ArrayList<PreemptionResourceRequest>();
for (ResourceRequest crr : allocation.getResourcePreemptions()) {
PreemptionResourceRequest prr =
recordFactory.newRecordInstance(PreemptionResourceRequest.class);
prr.setResourceRequest(crr);
pRes.add(prr);
}
contract.setContainers(pCont);
contract.setResourceRequest(pRes);
pMsg.setContract(contract);
}
return pMsg;
}
public void registerAppAttempt(ApplicationAttemptId attemptId) { public void registerAppAttempt(ApplicationAttemptId attemptId) {
AllocateResponse response = AllocateResponse response =

View File

@ -19,17 +19,43 @@
package org.apache.hadoop.yarn.server.resourcemanager.scheduler; package org.apache.hadoop.yarn.server.resourcemanager.scheduler;
import java.util.List; import java.util.List;
import java.util.Set;
import org.apache.hadoop.yarn.api.records.Container; import org.apache.hadoop.yarn.api.records.Container;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.api.records.ResourceRequest;
import org.apache.hadoop.yarn.factories.RecordFactory;
import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
public class Allocation { public class Allocation {
private final RecordFactory recordFactory =
RecordFactoryProvider.getRecordFactory(null);
final List<Container> containers; final List<Container> containers;
final Resource resourceLimit; final Resource resourceLimit;
final Set<ContainerId> strictContainers;
final Set<ContainerId> fungibleContainers;
final List<ResourceRequest> fungibleResources;
public Allocation(List<Container> containers, Resource resourceLimit) { public Allocation(List<Container> containers, Resource resourceLimit) {
this(containers, resourceLimit, null, null, null);
}
public Allocation(List<Container> containers, Resource resourceLimit,
Set<ContainerId> strictContainers) {
this(containers, resourceLimit, strictContainers, null, null);
}
public Allocation(List<Container> containers, Resource resourceLimit,
Set<ContainerId> strictContainers, Set<ContainerId> fungibleContainers,
List<ResourceRequest> fungibleResources) {
this.containers = containers; this.containers = containers;
this.resourceLimit = resourceLimit; this.resourceLimit = resourceLimit;
this.strictContainers = strictContainers;
this.fungibleContainers = fungibleContainers;
this.fungibleResources = fungibleResources;
} }
public List<Container> getContainers() { public List<Container> getContainers() {
@ -39,5 +65,17 @@ public class Allocation {
public Resource getResourceLimit() { public Resource getResourceLimit() {
return resourceLimit; return resourceLimit;
} }
public Set<ContainerId> getStrictContainerPreemptions() {
return strictContainers;
}
public Set<ContainerId> getContainerPreemptions() {
return fungibleContainers;
}
public List<ResourceRequest> getResourcePreemptions() {
return fungibleResources;
}
} }

View File

@ -23,6 +23,7 @@ import java.util.Collection;
import java.util.HashMap; import java.util.HashMap;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.Set;
import org.apache.commons.logging.Log; import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory; import org.apache.commons.logging.LogFactory;
@ -86,7 +87,9 @@ public class FSSchedulerApp extends SchedulerApplication {
final Map<Priority, Map<NodeId, RMContainer>> reservedContainers = final Map<Priority, Map<NodeId, RMContainer>> reservedContainers =
new HashMap<Priority, Map<NodeId, RMContainer>>(); new HashMap<Priority, Map<NodeId, RMContainer>>();
final Map<RMContainer, Long> preemptionMap = new HashMap<RMContainer, Long>();
/** /**
* Count how many times the application has been given an opportunity * Count how many times the application has been given an opportunity
* to schedule a task at each priority. Each time the scheduler * to schedule a task at each priority. Each time the scheduler
@ -233,6 +236,9 @@ public class FSSchedulerApp extends SchedulerApplication {
Resource containerResource = rmContainer.getContainer().getResource(); Resource containerResource = rmContainer.getContainer().getResource();
queue.getMetrics().releaseResources(getUser(), 1, containerResource); queue.getMetrics().releaseResources(getUser(), 1, containerResource);
Resources.subtractFrom(currentConsumption, containerResource); Resources.subtractFrom(currentConsumption, containerResource);
// remove from preemption map if it is completed
preemptionMap.remove(rmContainer);
} }
synchronized public List<Container> pullNewlyAllocatedContainers() { synchronized public List<Container> pullNewlyAllocatedContainers() {
@ -574,4 +580,18 @@ public class FSSchedulerApp extends SchedulerApplication {
" priority " + priority); " priority " + priority);
allowedLocalityLevel.put(priority, level); allowedLocalityLevel.put(priority, level);
} }
// related methods
public void addPreemption(RMContainer container, long time) {
assert preemptionMap.get(container) == null;
preemptionMap.put(container, time);
}
public Long getContainerPreemptionTime(RMContainer container) {
return preemptionMap.get(container);
}
public Set<RMContainer> getPreemptionContainers() {
return preemptionMap.keySet();
}
} }

View File

@ -24,8 +24,11 @@ import java.util.Collection;
import java.util.Collections; import java.util.Collections;
import java.util.Comparator; import java.util.Comparator;
import java.util.HashMap; import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentHashMap;
import org.apache.commons.logging.Log; import org.apache.commons.logging.Log;
@ -155,10 +158,16 @@ public class FairScheduler implements ResourceScheduler {
private Resource clusterCapacity = private Resource clusterCapacity =
RecordFactoryProvider.getRecordFactory(null).newRecordInstance(Resource.class); RecordFactoryProvider.getRecordFactory(null).newRecordInstance(Resource.class);
// How often tasks are preempted (must be longer than a couple // How often tasks are preempted
protected long preemptionInterval;
// ms to wait before force killing stuff (must be longer than a couple
// of heartbeats to give task-kill commands a chance to act). // of heartbeats to give task-kill commands a chance to act).
protected long preemptionInterval = 15000; protected long waitTimeBeforeKill;
// Containers whose AMs have been warned that they will be preempted soon.
private List<RMContainer> warnedContainers = new ArrayList<RMContainer>();
protected boolean preemptionEnabled; protected boolean preemptionEnabled;
protected boolean sizeBasedWeight; // Give larger weights to larger jobs protected boolean sizeBasedWeight; // Give larger weights to larger jobs
protected WeightAdjuster weightAdjuster; // Can be null for no weight adjuster protected WeightAdjuster weightAdjuster; // Can be null for no weight adjuster
@ -331,34 +340,78 @@ public class FairScheduler implements ResourceScheduler {
// Sort containers into reverse order of priority // Sort containers into reverse order of priority
Collections.sort(runningContainers, new Comparator<RMContainer>() { Collections.sort(runningContainers, new Comparator<RMContainer>() {
public int compare(RMContainer c1, RMContainer c2) { public int compare(RMContainer c1, RMContainer c2) {
return c2.getContainer().getPriority().compareTo( int ret = c2.getContainer().getPriority().compareTo(
c1.getContainer().getPriority()); c1.getContainer().getPriority());
if (ret == 0) {
return c2.getContainerId().compareTo(c1.getContainerId());
}
return ret;
} }
}); });
// Scan down the list of containers we've already warned and kill them
// if we need to. Remove any containers from the list that we don't need
// or that are no longer running.
Iterator<RMContainer> warnedIter = warnedContainers.iterator();
Set<RMContainer> preemptedThisRound = new HashSet<RMContainer>();
while (warnedIter.hasNext()) {
RMContainer container = warnedIter.next();
if (container.getState() == RMContainerState.RUNNING &&
Resources.greaterThan(RESOURCE_CALCULATOR, clusterCapacity,
toPreempt, Resources.none())) {
warnOrKillContainer(container, apps.get(container), queues.get(container));
preemptedThisRound.add(container);
Resources.subtractFrom(toPreempt, container.getContainer().getResource());
} else {
warnedIter.remove();
}
}
// Scan down the sorted list of task statuses until we've killed enough // Scan down the rest of the containers until we've preempted enough, making
// tasks, making sure we don't kill too many from any queue // sure we don't preempt too many from any queue
for (RMContainer container : runningContainers) { Iterator<RMContainer> runningIter = runningContainers.iterator();
while (runningIter.hasNext() &&
Resources.greaterThan(RESOURCE_CALCULATOR, clusterCapacity,
toPreempt, Resources.none())) {
RMContainer container = runningIter.next();
FSLeafQueue sched = queues.get(container); FSLeafQueue sched = queues.get(container);
if (Resources.greaterThan(RESOURCE_CALCULATOR, clusterCapacity, if (!preemptedThisRound.contains(container) &&
sched.getResourceUsage(), sched.getFairShare())) { Resources.greaterThan(RESOURCE_CALCULATOR, clusterCapacity,
LOG.info("Preempting container (prio=" + container.getContainer().getPriority() + sched.getResourceUsage(), sched.getFairShare())) {
"res=" + container.getContainer().getResource() + warnOrKillContainer(container, apps.get(container), sched);
") from queue " + sched.getName());
ContainerStatus status = SchedulerUtils.createAbnormalContainerStatus( warnedContainers.add(container);
Resources.subtractFrom(toPreempt, container.getContainer().getResource());
}
}
}
private void warnOrKillContainer(RMContainer container, FSSchedulerApp app,
FSLeafQueue queue) {
LOG.info("Preempting container (prio=" + container.getContainer().getPriority() +
"res=" + container.getContainer().getResource() +
") from queue " + queue.getName());
Long time = app.getContainerPreemptionTime(container);
if (time != null) {
// if we asked for preemption more than maxWaitTimeBeforeKill ms ago,
// proceed with kill
if (time + waitTimeBeforeKill < clock.getTime()) {
ContainerStatus status =
SchedulerUtils.createAbnormalContainerStatus(
container.getContainerId(), SchedulerUtils.PREEMPTED_CONTAINER); container.getContainerId(), SchedulerUtils.PREEMPTED_CONTAINER);
// TODO: Not sure if this ever actually adds this to the list of cleanup // TODO: Not sure if this ever actually adds this to the list of cleanup
// containers on the RMNode (see SchedulerNode.releaseContainer()). // containers on the RMNode (see SchedulerNode.releaseContainer()).
completedContainer(container, status, RMContainerEventType.KILL); completedContainer(container, status, RMContainerEventType.KILL);
LOG.info("Killing container" + container +
toPreempt = Resources.subtract(toPreempt, " (after waiting for premption for " +
container.getContainer().getResource()); (clock.getTime() - time) + "ms)");
if (Resources.lessThanOrEqual(RESOURCE_CALCULATOR, clusterCapacity,
toPreempt, Resources.none())) {
break;
}
} }
} else {
// track the request in the FSSchedulerApp itself
app.addPreemption(container, clock.getTime());
} }
} }
@ -483,11 +536,11 @@ public class FairScheduler implements ResourceScheduler {
return clusterCapacity; return clusterCapacity;
} }
public Clock getClock() { public synchronized Clock getClock() {
return clock; return clock;
} }
protected void setClock(Clock clock) { protected synchronized void setClock(Clock clock) {
this.clock = clock; this.clock = clock;
} }
@ -745,10 +798,18 @@ public class FairScheduler implements ResourceScheduler {
LOG.debug("allocate:" + LOG.debug("allocate:" +
" applicationAttemptId=" + appAttemptId + " applicationAttemptId=" + appAttemptId +
" #ask=" + ask.size()); " #ask=" + ask.size());
}
LOG.debug("Preempting " + application.getPreemptionContainers().size()
+ " container(s)");
}
Set<ContainerId> preemptionContainerIds = new HashSet<ContainerId>();
for (RMContainer container : application.getPreemptionContainers()) {
preemptionContainerIds.add(container.getContainerId());
}
return new Allocation(application.pullNewlyAllocatedContainers(), return new Allocation(application.pullNewlyAllocatedContainers(),
application.getHeadroom()); application.getHeadroom(), preemptionContainerIds);
} }
} }
@ -963,7 +1024,9 @@ public class FairScheduler implements ResourceScheduler {
assignMultiple = this.conf.getAssignMultiple(); assignMultiple = this.conf.getAssignMultiple();
maxAssign = this.conf.getMaxAssign(); maxAssign = this.conf.getMaxAssign();
sizeBasedWeight = this.conf.getSizeBasedWeight(); sizeBasedWeight = this.conf.getSizeBasedWeight();
preemptionInterval = this.conf.getPreemptionInterval();
waitTimeBeforeKill = this.conf.getWaitTimeBeforeKill();
if (!initialized) { if (!initialized) {
rootMetrics = QueueMetrics.forQueue("root", null, true, conf); rootMetrics = QueueMetrics.forQueue("root", null, true, conf);
this.rmContext = rmContext; this.rmContext = rmContext;

View File

@ -55,6 +55,11 @@ public class FairSchedulerConfiguration extends Configuration {
/** Whether preemption is enabled. */ /** Whether preemption is enabled. */
protected static final String PREEMPTION = CONF_PREFIX + "preemption"; protected static final String PREEMPTION = CONF_PREFIX + "preemption";
protected static final boolean DEFAULT_PREEMPTION = false; protected static final boolean DEFAULT_PREEMPTION = false;
protected static final String PREEMPTION_INTERVAL = CONF_PREFIX + "preemptionInterval";
protected static final int DEFAULT_PREEMPTION_INTERVAL = 5000;
protected static final String WAIT_TIME_BEFORE_KILL = CONF_PREFIX + "waitTimeBeforeKill";
protected static final int DEFAULT_WAIT_TIME_BEFORE_KILL = 15000;
/** Whether to assign multiple containers in one check-in. */ /** Whether to assign multiple containers in one check-in. */
protected static final String ASSIGN_MULTIPLE = CONF_PREFIX + "assignmultiple"; protected static final String ASSIGN_MULTIPLE = CONF_PREFIX + "assignmultiple";
@ -123,4 +128,12 @@ public class FairSchedulerConfiguration extends Configuration {
return get(EVENT_LOG_DIR, new File(System.getProperty("hadoop.log.dir", return get(EVENT_LOG_DIR, new File(System.getProperty("hadoop.log.dir",
"/tmp/")).getAbsolutePath() + File.separator + "fairscheduler"); "/tmp/")).getAbsolutePath() + File.separator + "fairscheduler");
} }
public int getPreemptionInterval() {
return getInt(PREEMPTION_INTERVAL, DEFAULT_PREEMPTION_INTERVAL);
}
public int getWaitTimeBeforeKill() {
return getInt(WAIT_TIME_BEFORE_KILL, DEFAULT_WAIT_TIME_BEFORE_KILL);
}
} }

View File

@ -30,6 +30,7 @@ import java.io.PrintWriter;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Arrays; import java.util.Arrays;
import java.util.Collection; import java.util.Collection;
import java.util.Collections;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
@ -900,9 +901,16 @@ public class TestFairScheduler {
*/ */
public void testChoiceOfPreemptedContainers() throws Exception { public void testChoiceOfPreemptedContainers() throws Exception {
Configuration conf = createConfiguration(); Configuration conf = createConfiguration();
conf.setLong(FairSchedulerConfiguration.PREEMPTION_INTERVAL, 5000);
conf.setLong(FairSchedulerConfiguration.WAIT_TIME_BEFORE_KILL, 10000);
conf.set(FairSchedulerConfiguration.ALLOCATION_FILE + ".allocation.file", ALLOC_FILE); conf.set(FairSchedulerConfiguration.ALLOCATION_FILE + ".allocation.file", ALLOC_FILE);
scheduler.reinitialize(conf, resourceManager.getRMContext()); scheduler.reinitialize(conf, resourceManager.getRMContext());
MockClock clock = new MockClock();
scheduler.setClock(clock);
PrintWriter out = new PrintWriter(new FileWriter(ALLOC_FILE)); PrintWriter out = new PrintWriter(new FileWriter(ALLOC_FILE));
out.println("<?xml version=\"1.0\"?>"); out.println("<?xml version=\"1.0\"?>");
out.println("<allocations>"); out.println("<allocations>");
@ -997,15 +1005,38 @@ public class TestFairScheduler {
Resources.createResource(2 * 1024)); Resources.createResource(2 * 1024));
assertEquals(1, scheduler.applications.get(app1).getLiveContainers().size()); assertEquals(1, scheduler.applications.get(app1).getLiveContainers().size());
assertEquals(1, scheduler.applications.get(app2).getLiveContainers().size()); assertEquals(1, scheduler.applications.get(app2).getLiveContainers().size());
assertEquals(0, scheduler.applications.get(app3).getLiveContainers().size());
assertEquals(1, scheduler.applications.get(app4).getLiveContainers().size()); assertEquals(1, scheduler.applications.get(app4).getLiveContainers().size());
assertEquals(1, scheduler.applications.get(app5).getLiveContainers().size()); assertEquals(1, scheduler.applications.get(app5).getLiveContainers().size());
// First verify we are adding containers to preemption list for the application
assertTrue(!Collections.disjoint(scheduler.applications.get(app3).getLiveContainers(),
scheduler.applications.get(app3).getPreemptionContainers()));
assertTrue(!Collections.disjoint(scheduler.applications.get(app6).getLiveContainers(),
scheduler.applications.get(app6).getPreemptionContainers()));
// Pretend 15 seconds have passed
clock.tick(15);
// Trigger a kill by insisting we want containers back
scheduler.preemptResources(scheduler.getQueueManager().getLeafQueues(),
Resources.createResource(2 * 1024));
// At this point the containers should have been killed (since we are not simulating AM)
assertEquals(0, scheduler.applications.get(app6).getLiveContainers().size()); assertEquals(0, scheduler.applications.get(app6).getLiveContainers().size());
assertEquals(0, scheduler.applications.get(app3).getLiveContainers().size());
// Trigger a kill by insisting we want containers back
scheduler.preemptResources(scheduler.getQueueManager().getLeafQueues(),
Resources.createResource(2 * 1024));
// Pretend 15 seconds have passed
clock.tick(15);
// We should be able to claw back another container from A and B each. // We should be able to claw back another container from A and B each.
// Make sure it is lowest priority container. // Make sure it is lowest priority container.
scheduler.preemptResources(scheduler.getQueueManager().getLeafQueues(), scheduler.preemptResources(scheduler.getQueueManager().getLeafQueues(),
Resources.createResource(2 * 1024)); Resources.createResource(2 * 1024));
assertEquals(1, scheduler.applications.get(app1).getLiveContainers().size()); assertEquals(1, scheduler.applications.get(app1).getLiveContainers().size());
assertEquals(0, scheduler.applications.get(app2).getLiveContainers().size()); assertEquals(0, scheduler.applications.get(app2).getLiveContainers().size());
assertEquals(0, scheduler.applications.get(app3).getLiveContainers().size()); assertEquals(0, scheduler.applications.get(app3).getLiveContainers().size());