YARN-8804. resourceLimits may be wrongly calculated when leaf-queue is blocked in cluster with 3+ level queues. Contributed by Tao Yang
(cherry picked from commit 6b988d821e
)
This commit is contained in:
parent
3d67080a45
commit
1b0a110501
|
@ -38,6 +38,9 @@ public class ResourceLimits {
|
|||
// containers.
|
||||
private volatile Resource headroom;
|
||||
|
||||
// How much resource should be reserved for high-priority blocked queues
|
||||
private Resource blockedHeadroom;
|
||||
|
||||
private boolean allowPreempt = false;
|
||||
|
||||
public ResourceLimits(Resource limit) {
|
||||
|
@ -81,4 +84,25 @@ public class ResourceLimits {
|
|||
public void setIsAllowPreemption(boolean allowPreempt) {
|
||||
this.allowPreempt = allowPreempt;
|
||||
}
|
||||
|
||||
public void addBlockedHeadroom(Resource resource) {
|
||||
if (blockedHeadroom == null) {
|
||||
blockedHeadroom = Resource.newInstance(0, 0);
|
||||
}
|
||||
Resources.addTo(blockedHeadroom, resource);
|
||||
}
|
||||
|
||||
public Resource getBlockedHeadroom() {
|
||||
if (blockedHeadroom == null) {
|
||||
return Resources.none();
|
||||
}
|
||||
return blockedHeadroom;
|
||||
}
|
||||
|
||||
public Resource getNetLimit() {
|
||||
if (blockedHeadroom != null) {
|
||||
return Resources.subtract(limit, blockedHeadroom);
|
||||
}
|
||||
return limit;
|
||||
}
|
||||
}
|
||||
|
|
|
@ -714,7 +714,6 @@ public class ParentQueue extends AbstractCSQueue {
|
|||
SchedulingMode schedulingMode) {
|
||||
CSAssignment assignment = CSAssignment.NULL_ASSIGNMENT;
|
||||
|
||||
Resource parentLimits = limits.getLimit();
|
||||
printChildQueues();
|
||||
|
||||
// Try to assign to most 'under-served' sub-queue
|
||||
|
@ -728,9 +727,9 @@ public class ParentQueue extends AbstractCSQueue {
|
|||
|
||||
// Get ResourceLimits of child queue before assign containers
|
||||
ResourceLimits childLimits =
|
||||
getResourceLimitsOfChild(childQueue, cluster, parentLimits,
|
||||
getResourceLimitsOfChild(childQueue, cluster, limits.getNetLimit(),
|
||||
ps.getPartition());
|
||||
|
||||
|
||||
CSAssignment childAssignment = childQueue.assignContainers(cluster, ps,
|
||||
childLimits, schedulingMode);
|
||||
if(LOG.isDebugEnabled()) {
|
||||
|
@ -750,16 +749,21 @@ public class ParentQueue extends AbstractCSQueue {
|
|||
CSAssignment.SkippedType.QUEUE_LIMIT) {
|
||||
assignment = childAssignment;
|
||||
}
|
||||
Resource blockedHeadroom = null;
|
||||
if (childQueue instanceof LeafQueue) {
|
||||
blockedHeadroom = childLimits.getHeadroom();
|
||||
} else {
|
||||
blockedHeadroom = childLimits.getBlockedHeadroom();
|
||||
}
|
||||
Resource resourceToSubtract = Resources.max(resourceCalculator,
|
||||
cluster, childLimits.getHeadroom(), Resources.none());
|
||||
cluster, blockedHeadroom, Resources.none());
|
||||
limits.addBlockedHeadroom(resourceToSubtract);
|
||||
if(LOG.isDebugEnabled()) {
|
||||
LOG.debug("Decrease parentLimits " + parentLimits +
|
||||
LOG.debug("Decrease parentLimits " + limits.getLimit() +
|
||||
" for " + this.getQueueName() + " by " +
|
||||
resourceToSubtract + " as childQueue=" +
|
||||
childQueue.getQueueName() + " is blocked");
|
||||
}
|
||||
parentLimits = Resources.subtract(parentLimits,
|
||||
resourceToSubtract);
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -58,6 +58,8 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeRemoved
|
|||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeUpdateSchedulerEvent;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.security.RMContainerTokenSecretManager;
|
||||
import org.apache.hadoop.yarn.server.utils.BuilderUtils;
|
||||
import org.apache.hadoop.yarn.util.resource.DominantResourceCalculator;
|
||||
import org.apache.hadoop.yarn.util.resource.ResourceCalculator;
|
||||
import org.apache.hadoop.yarn.util.resource.Resources;
|
||||
import org.junit.Assert;
|
||||
import org.junit.Before;
|
||||
|
@ -982,4 +984,82 @@ public class TestContainerAllocation {
|
|||
Assert.assertEquals(2, lq.getMetrics().getAppsPending());
|
||||
rm1.close();
|
||||
}
|
||||
|
||||
@Test(timeout = 60000)
|
||||
public void testAllocationCannotBeBlockedWhenFormerQueueReachedItsLimit()
|
||||
throws Exception {
|
||||
/**
|
||||
* Queue structure:
|
||||
* <pre>
|
||||
* Root
|
||||
* / | \
|
||||
* a b c
|
||||
* 10 20 70
|
||||
* | \
|
||||
* c1 c2
|
||||
* 10(max=10) 90
|
||||
* </pre>
|
||||
* Test case:
|
||||
* Create a cluster with two nodes whose node resource both are
|
||||
* <10GB, 10core>, create queues as above, among them max-capacity of "c1"
|
||||
* is 10 and others are all 100, so that max-capacity of queue "c1" is
|
||||
* <2GB, 2core>,
|
||||
* submit app1 to queue "c1" and launch am1(resource=<1GB, 1 core>) on nm1,
|
||||
* submit app2 to queue "b" and launch am2(resource=<1GB, 1 core>) on nm1,
|
||||
* app1 and app2 both ask one <2GB, 1core> containers
|
||||
*
|
||||
* Now queue "c" has lower capacity percentage than queue "b", the
|
||||
* allocation sequence will be "a" -> "c" -> "b", queue "c1" has reached
|
||||
* queue limit so that requests of app1 should be pending
|
||||
*
|
||||
* After nm1 do 1 heartbeat, scheduler should allocate one container for
|
||||
* app2 on nm1.
|
||||
*/
|
||||
CapacitySchedulerConfiguration newConf =
|
||||
(CapacitySchedulerConfiguration) TestUtils
|
||||
.getConfigurationWithMultipleQueues(conf);
|
||||
newConf.setQueues(CapacitySchedulerConfiguration.ROOT + ".c",
|
||||
new String[] { "c1", "c2" });
|
||||
newConf.setCapacity(CapacitySchedulerConfiguration.ROOT + ".c.c1", 10);
|
||||
newConf
|
||||
.setMaximumCapacity(CapacitySchedulerConfiguration.ROOT + ".c.c1", 10);
|
||||
newConf.setCapacity(CapacitySchedulerConfiguration.ROOT + ".c.c2", 90);
|
||||
newConf.setClass(CapacitySchedulerConfiguration.RESOURCE_CALCULATOR_CLASS,
|
||||
DominantResourceCalculator.class, ResourceCalculator.class);
|
||||
|
||||
MockRM rm1 = new MockRM(newConf);
|
||||
|
||||
RMNodeLabelsManager nodeLabelsManager = new NullRMNodeLabelsManager();
|
||||
nodeLabelsManager.init(newConf);
|
||||
rm1.getRMContext().setNodeLabelManager(nodeLabelsManager);
|
||||
rm1.start();
|
||||
MockNM nm1 = rm1.registerNode("h1:1234", 10 * GB);
|
||||
MockNM nm2 = rm1.registerNode("h2:1234", 10 * GB);
|
||||
|
||||
// launch an app to queue "c1", AM container should be launched on nm1
|
||||
RMApp app1 = rm1.submitApp(1 * GB, "app", "user", null, "c1");
|
||||
MockAM am1 = MockRM.launchAndRegisterAM(app1, rm1, nm1);
|
||||
|
||||
// launch another app to queue "b", AM container should be launched on nm1
|
||||
RMApp app2 = rm1.submitApp(1 * GB, "app", "user", null, "b");
|
||||
MockAM am2 = MockRM.launchAndRegisterAM(app2, rm1, nm1);
|
||||
|
||||
am1.allocate("*", 2 * GB, 1, new ArrayList<ContainerId>());
|
||||
am2.allocate("*", 2 * GB, 1, new ArrayList<ContainerId>());
|
||||
|
||||
CapacityScheduler cs = (CapacityScheduler) rm1.getResourceScheduler();
|
||||
RMNode rmNode1 = rm1.getRMContext().getRMNodes().get(nm1.getNodeId());
|
||||
FiCaSchedulerApp schedulerApp1 =
|
||||
cs.getApplicationAttempt(am1.getApplicationAttemptId());
|
||||
FiCaSchedulerApp schedulerApp2 =
|
||||
cs.getApplicationAttempt(am2.getApplicationAttemptId());
|
||||
|
||||
// Do nm1 heartbeats 1 times, will allocate a container on nm1 for app2
|
||||
cs.handle(new NodeUpdateSchedulerEvent(rmNode1));
|
||||
rm1.drainEvents();
|
||||
Assert.assertEquals(1, schedulerApp1.getLiveContainers().size());
|
||||
Assert.assertEquals(2, schedulerApp2.getLiveContainers().size());
|
||||
|
||||
rm1.close();
|
||||
}
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue