YARN-2022 Preempting an Application Master container can be kept as least priority when multiple applications are marked for preemption by ProportionalCapacityPreemptionPolicy (Sunil G via mayank)

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1607227 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Mayank Bansal 2014-07-02 01:54:47 +00:00
parent 075ff276ca
commit 03a25d2cc1
9 changed files with 282 additions and 15 deletions

View File

@ -56,6 +56,10 @@ Release 2.5.0 - UNRELEASED
IMPROVEMENTS
YARN-2022 Preempting an Application Master container can be kept as least priority
when multiple applications are marked for preemption by
ProportionalCapacityPreemptionPolicy (Sunil G via mayank)
YARN-1479. Invalid NaN values in Hadoop REST API JSON response (Chen He via
jeagles)

View File

@ -111,7 +111,7 @@ public class ProportionalCapacityPreemptionPolicy implements SchedulingEditPolic
public static final String NATURAL_TERMINATION_FACTOR =
"yarn.resourcemanager.monitor.capacity.preemption.natural_termination_factor";
//the dispatcher to send preempt and kill events
// the dispatcher to send preempt and kill events
public EventHandler<ContainerPreemptEvent> dispatcher;
private final Clock clock;
@ -437,8 +437,9 @@ public class ProportionalCapacityPreemptionPolicy implements SchedulingEditPolic
private Map<ApplicationAttemptId,Set<RMContainer>> getContainersToPreempt(
List<TempQueue> queues, Resource clusterResource) {
Map<ApplicationAttemptId,Set<RMContainer>> list =
Map<ApplicationAttemptId,Set<RMContainer>> preemptMap =
new HashMap<ApplicationAttemptId,Set<RMContainer>>();
List<RMContainer> skippedAMContainerlist = new ArrayList<RMContainer>();
for (TempQueue qT : queues) {
// we act only if we are violating balance by more than
@ -449,26 +450,83 @@ public class ProportionalCapacityPreemptionPolicy implements SchedulingEditPolic
// accounts for natural termination of containers
Resource resToObtain =
Resources.multiply(qT.toBePreempted, naturalTerminationFactor);
Resource skippedAMSize = Resource.newInstance(0, 0);
// lock the leafqueue while we scan applications and unreserve
synchronized(qT.leafQueue) {
synchronized (qT.leafQueue) {
NavigableSet<FiCaSchedulerApp> ns =
(NavigableSet<FiCaSchedulerApp>) qT.leafQueue.getApplications();
Iterator<FiCaSchedulerApp> desc = ns.descendingIterator();
qT.actuallyPreempted = Resources.clone(resToObtain);
while (desc.hasNext()) {
FiCaSchedulerApp fc = desc.next();
if (Resources.lessThanOrEqual(rc, clusterResource,
resToObtain, Resources.none())) {
if (Resources.lessThanOrEqual(rc, clusterResource, resToObtain,
Resources.none())) {
break;
}
list.put(fc.getApplicationAttemptId(),
preemptFrom(fc, clusterResource, resToObtain));
preemptMap.put(
fc.getApplicationAttemptId(),
preemptFrom(fc, clusterResource, resToObtain,
skippedAMContainerlist, skippedAMSize));
}
Resource maxAMCapacityForThisQueue = Resources.multiply(
Resources.multiply(clusterResource,
qT.leafQueue.getAbsoluteCapacity()),
qT.leafQueue.getMaxAMResourcePerQueuePercent());
// Can try preempting AMContainers (still saving atmost
// maxAMCapacityForThisQueue AMResource's) if more resources are
// required to be preempted from this Queue.
preemptAMContainers(clusterResource, preemptMap,
skippedAMContainerlist, resToObtain, skippedAMSize,
maxAMCapacityForThisQueue);
}
}
}
return preemptMap;
}
return list;
/**
* As more resources are needed for preemption, saved AMContainers has to be
* rescanned. Such AMContainers can be preempted based on resToObtain, but
* maxAMCapacityForThisQueue resources will be still retained.
*
* @param clusterResource
* @param preemptMap
* @param skippedAMContainerlist
* @param resToObtain
* @param skippedAMSize
* @param maxAMCapacityForThisQueue
*/
private void preemptAMContainers(Resource clusterResource,
Map<ApplicationAttemptId, Set<RMContainer>> preemptMap,
List<RMContainer> skippedAMContainerlist, Resource resToObtain,
Resource skippedAMSize, Resource maxAMCapacityForThisQueue) {
for (RMContainer c : skippedAMContainerlist) {
// Got required amount of resources for preemption, can stop now
if (Resources.lessThanOrEqual(rc, clusterResource, resToObtain,
Resources.none())) {
break;
}
// Once skippedAMSize reaches down to maxAMCapacityForThisQueue,
// container selection iteration for preemption will be stopped.
if (Resources.lessThanOrEqual(rc, clusterResource, skippedAMSize,
maxAMCapacityForThisQueue)) {
break;
}
Set<RMContainer> contToPrempt = preemptMap.get(c
.getApplicationAttemptId());
if (null == contToPrempt) {
contToPrempt = new HashSet<RMContainer>();
preemptMap.put(c.getApplicationAttemptId(), contToPrempt);
}
contToPrempt.add(c);
Resources.subtractFrom(resToObtain, c.getContainer().getResource());
Resources.subtractFrom(skippedAMSize, c.getContainer()
.getResource());
}
skippedAMContainerlist.clear();
}
/**
@ -480,8 +538,9 @@ public class ProportionalCapacityPreemptionPolicy implements SchedulingEditPolic
* @param rsrcPreempt
* @return
*/
private Set<RMContainer> preemptFrom(
FiCaSchedulerApp app, Resource clusterResource, Resource rsrcPreempt) {
private Set<RMContainer> preemptFrom(FiCaSchedulerApp app,
Resource clusterResource, Resource rsrcPreempt,
List<RMContainer> skippedAMContainerlist, Resource skippedAMSize) {
Set<RMContainer> ret = new HashSet<RMContainer>();
ApplicationAttemptId appId = app.getApplicationAttemptId();
@ -513,6 +572,12 @@ public class ProportionalCapacityPreemptionPolicy implements SchedulingEditPolic
rsrcPreempt, Resources.none())) {
return ret;
}
// Skip AM Container from preemption for now.
if (c.isAMContainer()) {
skippedAMContainerlist.add(c);
Resources.addTo(skippedAMSize, c.getContainer().getResource());
continue;
}
ret.add(c);
Resources.subtractFrom(rsrcPreempt, c.getContainer().getResource());
}

View File

@ -84,6 +84,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.event.RMAppAt
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.event.RMAppAttemptStatusupdateEvent;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.event.RMAppAttemptUnregistrationEvent;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.event.RMAppAttemptUpdateSavedEvent;
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerImpl;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.Allocation;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.YarnScheduler;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppAttemptAddedSchedulerEvent;
@ -833,6 +834,9 @@ public class RMAppAttemptImpl implements RMAppAttempt, Recoverable {
// Set the masterContainer
appAttempt.setMasterContainer(amContainerAllocation.getContainers()
.get(0));
RMContainerImpl rmMasterContainer = (RMContainerImpl)appAttempt.scheduler
.getRMContainer(appAttempt.getMasterContainer().getId());
rmMasterContainer.setAMContainer(true);
// The node set in NMTokenSecrentManager is used for marking whether the
// NMToken has been issued for this node to the AM.
// When AM container was allocated to RM itself, the node which allocates

View File

@ -72,4 +72,6 @@ public interface RMContainer extends EventHandler<RMContainerEvent> {
ContainerReport createContainerReport();
boolean isAMContainer();
}

View File

@ -155,6 +155,7 @@ public class RMContainerImpl implements RMContainer {
private long creationTime;
private long finishTime;
private ContainerStatus finishedStatus;
private boolean isAMContainer;
public RMContainerImpl(Container container,
ApplicationAttemptId appAttemptId, NodeId nodeId, String user,
@ -176,6 +177,7 @@ public class RMContainerImpl implements RMContainer {
this.rmContext = rmContext;
this.eventHandler = rmContext.getDispatcher().getEventHandler();
this.containerAllocationExpirer = rmContext.getContainerAllocationExpirer();
this.isAMContainer = false;
ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
this.readLock = lock.readLock();
@ -313,6 +315,25 @@ public class RMContainerImpl implements RMContainer {
return containerId.toString();
}
@Override
public boolean isAMContainer() {
try {
readLock.lock();
return isAMContainer;
} finally {
readLock.unlock();
}
}
public void setAMContainer(boolean isAMContainer) {
try {
writeLock.lock();
this.isAMContainer = isAMContainer;
} finally {
writeLock.unlock();
}
}
@Override
public void handle(RMContainerEvent event) {
LOG.debug("Processing " + event.getContainerId() + " of type " + event.getType());
@ -490,5 +511,4 @@ public class RMContainerImpl implements RMContainer {
}
return containerReport;
}
}

View File

@ -39,6 +39,7 @@ import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.server.api.protocolrecords.NMContainerStatus;
import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt;
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerImpl;
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerRecoverEvent;
@ -242,6 +243,20 @@ public abstract class AbstractYarnScheduler
// recover scheduler attempt
schedulerAttempt.recoverContainer(rmContainer);
// set master container for the current running AMContainer for this
// attempt.
RMAppAttempt appAttempt = rmApp.getCurrentAppAttempt();
if (appAttempt != null) {
Container masterContainer = appAttempt.getMasterContainer();
// Mark current running AMContainer's RMContainer based on the master
// container ID stored in AppAttempt.
if (masterContainer != null
&& masterContainer.getId().equals(rmContainer.getContainerId())) {
((RMContainerImpl)rmContainer).setAMContainer(true);
}
}
}
}

View File

@ -62,6 +62,7 @@ import org.apache.log4j.Level;
import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
@ -565,6 +566,43 @@ public class TestWorkPreservingRMRestart {
rm2.waitForState(am0.getApplicationAttemptId(), RMAppAttemptState.RUNNING);
}
@Test (timeout = 30000)
public void testAMContainerStatusWithRMRestart() throws Exception {
MemoryRMStateStore memStore = new MemoryRMStateStore();
memStore.init(conf);
rm1 = new MockRM(conf, memStore);
rm1.start();
MockNM nm1 =
new MockNM("127.0.0.1:1234", 8192, rm1.getResourceTrackerService());
nm1.registerNode();
RMApp app1_1 = rm1.submitApp(1024);
MockAM am1_1 = MockRM.launchAndRegisterAM(app1_1, rm1, nm1);
RMAppAttempt attempt0 = app1_1.getCurrentAppAttempt();
AbstractYarnScheduler scheduler =
((AbstractYarnScheduler) rm1.getResourceScheduler());
Assert.assertTrue(scheduler.getRMContainer(
attempt0.getMasterContainer().getId()).isAMContainer());
// Re-start RM
rm2 = new MockRM(conf, memStore);
rm2.start();
nm1.setResourceTrackerService(rm2.getResourceTrackerService());
List<NMContainerStatus> am1_1Containers =
createNMContainerStatusForApp(am1_1);
nm1.registerNode(am1_1Containers, null);
// Wait for RM to settle down on recovering containers;
waitForNumContainersToRecover(2, rm2, am1_1.getApplicationAttemptId());
scheduler = ((AbstractYarnScheduler) rm2.getResourceScheduler());
Assert.assertTrue(scheduler.getRMContainer(
attempt0.getMasterContainer().getId()).isAMContainer());
}
private void asserteMetrics(QueueMetrics qm, int appsSubmitted,
int appsPending, int appsRunning, int appsCompleted,
int allocatedContainers, int availableMB, int availableVirtualCores,

View File

@ -80,6 +80,8 @@ public class TestProportionalCapacityPreemptionPolicy {
static final long TS = 3141592653L;
int appAlloc = 0;
boolean setAMContainer = false;
float setAMResourcePercent = 0.0f;
Random rand = null;
Clock mClock = null;
Configuration conf = null;
@ -467,6 +469,107 @@ public class TestProportionalCapacityPreemptionPolicy {
fail("Failed to find SchedulingMonitor service, please check what happened");
}
@Test
public void testSkipAMContainer() {
int[][] qData = new int[][] {
// / A B
{ 100, 50, 50 }, // abs
{ 100, 100, 100 }, // maxcap
{ 100, 100, 0 }, // used
{ 70, 20, 50 }, // pending
{ 0, 0, 0 }, // reserved
{ 5, 4, 1 }, // apps
{ -1, 1, 1 }, // req granularity
{ 2, 0, 0 }, // subqueues
};
setAMContainer = true;
ProportionalCapacityPreemptionPolicy policy = buildPolicy(qData);
policy.editSchedule();
// By skipping AM Container, all other 24 containers of appD will be
// preempted
verify(mDisp, times(24)).handle(argThat(new IsPreemptionRequestFor(appD)));
// By skipping AM Container, all other 24 containers of appC will be
// preempted
verify(mDisp, times(24)).handle(argThat(new IsPreemptionRequestFor(appC)));
// Since AM containers of appC and appD are saved, 2 containers from appB
// has to be preempted.
verify(mDisp, times(2)).handle(argThat(new IsPreemptionRequestFor(appB)));
setAMContainer = false;
}
@Test
public void testPreemptSkippedAMContainers() {
int[][] qData = new int[][] {
// / A B
{ 100, 10, 90 }, // abs
{ 100, 100, 100 }, // maxcap
{ 100, 100, 0 }, // used
{ 70, 20, 90 }, // pending
{ 0, 0, 0 }, // reserved
{ 5, 4, 1 }, // apps
{ -1, 5, 5 }, // req granularity
{ 2, 0, 0 }, // subqueues
};
setAMContainer = true;
ProportionalCapacityPreemptionPolicy policy = buildPolicy(qData);
policy.editSchedule();
// All 5 containers of appD will be preempted including AM container.
verify(mDisp, times(5)).handle(argThat(new IsPreemptionRequestFor(appD)));
// All 5 containers of appC will be preempted including AM container.
verify(mDisp, times(5)).handle(argThat(new IsPreemptionRequestFor(appC)));
// By skipping AM Container, all other 4 containers of appB will be
// preempted
verify(mDisp, times(4)).handle(argThat(new IsPreemptionRequestFor(appB)));
// By skipping AM Container, all other 4 containers of appA will be
// preempted
verify(mDisp, times(4)).handle(argThat(new IsPreemptionRequestFor(appA)));
setAMContainer = false;
}
@Test
public void testAMResourcePercentForSkippedAMContainers() {
int[][] qData = new int[][] {
// / A B
{ 100, 10, 90 }, // abs
{ 100, 100, 100 }, // maxcap
{ 100, 100, 0 }, // used
{ 70, 20, 90 }, // pending
{ 0, 0, 0 }, // reserved
{ 5, 4, 1 }, // apps
{ -1, 5, 5 }, // req granularity
{ 2, 0, 0 }, // subqueues
};
setAMContainer = true;
setAMResourcePercent = 0.5f;
ProportionalCapacityPreemptionPolicy policy = buildPolicy(qData);
policy.editSchedule();
// AMResoucePercent is 50% of cluster and maxAMCapacity will be 5Gb.
// Total used AM container size is 20GB, hence 2 AM container has
// to be preempted as Queue Capacity is 10Gb.
verify(mDisp, times(5)).handle(argThat(new IsPreemptionRequestFor(appD)));
// Including AM Container, all other 4 containers of appC will be
// preempted
verify(mDisp, times(5)).handle(argThat(new IsPreemptionRequestFor(appC)));
// By skipping AM Container, all other 4 containers of appB will be
// preempted
verify(mDisp, times(4)).handle(argThat(new IsPreemptionRequestFor(appB)));
// By skipping AM Container, all other 4 containers of appA will be
// preempted
verify(mDisp, times(4)).handle(argThat(new IsPreemptionRequestFor(appA)));
setAMContainer = false;
}
static class IsPreemptionRequestFor
extends ArgumentMatcher<ContainerPreemptEvent> {
private final ApplicationAttemptId appAttId;
@ -583,6 +686,9 @@ public class TestProportionalCapacityPreemptionPolicy {
}
}
when(lq.getApplications()).thenReturn(qApps);
if(setAMResourcePercent != 0.0f){
when(lq.getMaxAMResourcePerQueuePercent()).thenReturn(setAMResourcePercent);
}
p.getChildQueues().add(lq);
return lq;
}
@ -607,7 +713,11 @@ public class TestProportionalCapacityPreemptionPolicy {
List<RMContainer> cLive = new ArrayList<RMContainer>();
for (int i = 0; i < used; i += gran) {
if(setAMContainer && i == 0){
cLive.add(mockContainer(appAttId, cAlloc, unit, 0));
}else{
cLive.add(mockContainer(appAttId, cAlloc, unit, 1));
}
++cAlloc;
}
when(app.getLiveContainers()).thenReturn(cLive);
@ -623,6 +733,10 @@ public class TestProportionalCapacityPreemptionPolicy {
RMContainer mC = mock(RMContainer.class);
when(mC.getContainerId()).thenReturn(cId);
when(mC.getContainer()).thenReturn(c);
when(mC.getApplicationAttemptId()).thenReturn(appAttId);
if(0 == priority){
when(mC.isAMContainer()).thenReturn(true);
}
return mC;
}

View File

@ -86,6 +86,8 @@ import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.event.RMAppAt
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.event.RMAppAttemptUnregistrationEvent;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.event.RMAppAttemptUpdateSavedEvent;
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.ContainerAllocationExpirer;
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerImpl;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.Allocation;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.YarnScheduler;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppAttemptAddedSchedulerEvent;
@ -600,6 +602,9 @@ public class TestRMAppAttemptTransitions {
any(List.class),
any(List.class))).
thenReturn(allocation);
RMContainer rmContainer = mock(RMContainerImpl.class);
when(scheduler.getRMContainer(container.getId())).
thenReturn(rmContainer);
applicationAttempt.handle(
new RMAppAttemptContainerAllocatedEvent(