YARN-8804. resourceLimits may be wrongly calculated when leaf-queue is blocked in cluster with 3+ level queues. Contributed by Tao Yang
(cherry picked from commit 6b988d821e
)
This commit is contained in:
parent
b28dacf60c
commit
17583e690a
|
@ -38,6 +38,9 @@ public class ResourceLimits {
|
||||||
// containers.
|
// containers.
|
||||||
private volatile Resource headroom;
|
private volatile Resource headroom;
|
||||||
|
|
||||||
|
// How much resource should be reserved for high-priority blocked queues
|
||||||
|
private Resource blockedHeadroom;
|
||||||
|
|
||||||
private boolean allowPreempt = false;
|
private boolean allowPreempt = false;
|
||||||
|
|
||||||
public ResourceLimits(Resource limit) {
|
public ResourceLimits(Resource limit) {
|
||||||
|
@ -81,4 +84,25 @@ public class ResourceLimits {
|
||||||
public void setIsAllowPreemption(boolean allowPreempt) {
|
public void setIsAllowPreemption(boolean allowPreempt) {
|
||||||
this.allowPreempt = allowPreempt;
|
this.allowPreempt = allowPreempt;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public void addBlockedHeadroom(Resource resource) {
|
||||||
|
if (blockedHeadroom == null) {
|
||||||
|
blockedHeadroom = Resource.newInstance(0, 0);
|
||||||
|
}
|
||||||
|
Resources.addTo(blockedHeadroom, resource);
|
||||||
|
}
|
||||||
|
|
||||||
|
public Resource getBlockedHeadroom() {
|
||||||
|
if (blockedHeadroom == null) {
|
||||||
|
return Resources.none();
|
||||||
|
}
|
||||||
|
return blockedHeadroom;
|
||||||
|
}
|
||||||
|
|
||||||
|
public Resource getNetLimit() {
|
||||||
|
if (blockedHeadroom != null) {
|
||||||
|
return Resources.subtract(limit, blockedHeadroom);
|
||||||
|
}
|
||||||
|
return limit;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -715,7 +715,6 @@ public class ParentQueue extends AbstractCSQueue {
|
||||||
SchedulingMode schedulingMode) {
|
SchedulingMode schedulingMode) {
|
||||||
CSAssignment assignment = CSAssignment.NULL_ASSIGNMENT;
|
CSAssignment assignment = CSAssignment.NULL_ASSIGNMENT;
|
||||||
|
|
||||||
Resource parentLimits = limits.getLimit();
|
|
||||||
printChildQueues();
|
printChildQueues();
|
||||||
|
|
||||||
// Try to assign to most 'under-served' sub-queue
|
// Try to assign to most 'under-served' sub-queue
|
||||||
|
@ -729,7 +728,7 @@ public class ParentQueue extends AbstractCSQueue {
|
||||||
|
|
||||||
// Get ResourceLimits of child queue before assign containers
|
// Get ResourceLimits of child queue before assign containers
|
||||||
ResourceLimits childLimits =
|
ResourceLimits childLimits =
|
||||||
getResourceLimitsOfChild(childQueue, cluster, parentLimits,
|
getResourceLimitsOfChild(childQueue, cluster, limits.getNetLimit(),
|
||||||
candidates.getPartition());
|
candidates.getPartition());
|
||||||
|
|
||||||
CSAssignment childAssignment = childQueue.assignContainers(cluster,
|
CSAssignment childAssignment = childQueue.assignContainers(cluster,
|
||||||
|
@ -751,16 +750,21 @@ public class ParentQueue extends AbstractCSQueue {
|
||||||
CSAssignment.SkippedType.QUEUE_LIMIT) {
|
CSAssignment.SkippedType.QUEUE_LIMIT) {
|
||||||
assignment = childAssignment;
|
assignment = childAssignment;
|
||||||
}
|
}
|
||||||
|
Resource blockedHeadroom = null;
|
||||||
|
if (childQueue instanceof LeafQueue) {
|
||||||
|
blockedHeadroom = childLimits.getHeadroom();
|
||||||
|
} else {
|
||||||
|
blockedHeadroom = childLimits.getBlockedHeadroom();
|
||||||
|
}
|
||||||
Resource resourceToSubtract = Resources.max(resourceCalculator,
|
Resource resourceToSubtract = Resources.max(resourceCalculator,
|
||||||
cluster, childLimits.getHeadroom(), Resources.none());
|
cluster, blockedHeadroom, Resources.none());
|
||||||
|
limits.addBlockedHeadroom(resourceToSubtract);
|
||||||
if(LOG.isDebugEnabled()) {
|
if(LOG.isDebugEnabled()) {
|
||||||
LOG.debug("Decrease parentLimits " + parentLimits +
|
LOG.debug("Decrease parentLimits " + limits.getLimit() +
|
||||||
" for " + this.getQueueName() + " by " +
|
" for " + this.getQueueName() + " by " +
|
||||||
resourceToSubtract + " as childQueue=" +
|
resourceToSubtract + " as childQueue=" +
|
||||||
childQueue.getQueueName() + " is blocked");
|
childQueue.getQueueName() + " is blocked");
|
||||||
}
|
}
|
||||||
parentLimits = Resources.subtract(parentLimits,
|
|
||||||
resourceToSubtract);
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -58,6 +58,8 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeRemoved
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeUpdateSchedulerEvent;
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeUpdateSchedulerEvent;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.security.RMContainerTokenSecretManager;
|
import org.apache.hadoop.yarn.server.resourcemanager.security.RMContainerTokenSecretManager;
|
||||||
import org.apache.hadoop.yarn.server.utils.BuilderUtils;
|
import org.apache.hadoop.yarn.server.utils.BuilderUtils;
|
||||||
|
import org.apache.hadoop.yarn.util.resource.DominantResourceCalculator;
|
||||||
|
import org.apache.hadoop.yarn.util.resource.ResourceCalculator;
|
||||||
import org.apache.hadoop.yarn.util.resource.Resources;
|
import org.apache.hadoop.yarn.util.resource.Resources;
|
||||||
import org.junit.Assert;
|
import org.junit.Assert;
|
||||||
import org.junit.Before;
|
import org.junit.Before;
|
||||||
|
@ -982,4 +984,82 @@ public class TestContainerAllocation {
|
||||||
Assert.assertEquals(2, lq.getMetrics().getAppsPending());
|
Assert.assertEquals(2, lq.getMetrics().getAppsPending());
|
||||||
rm1.close();
|
rm1.close();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test(timeout = 60000)
|
||||||
|
public void testAllocationCannotBeBlockedWhenFormerQueueReachedItsLimit()
|
||||||
|
throws Exception {
|
||||||
|
/**
|
||||||
|
* Queue structure:
|
||||||
|
* <pre>
|
||||||
|
* Root
|
||||||
|
* / | \
|
||||||
|
* a b c
|
||||||
|
* 10 20 70
|
||||||
|
* | \
|
||||||
|
* c1 c2
|
||||||
|
* 10(max=10) 90
|
||||||
|
* </pre>
|
||||||
|
* Test case:
|
||||||
|
* Create a cluster with two nodes whose node resource both are
|
||||||
|
* <10GB, 10core>, create queues as above, among them max-capacity of "c1"
|
||||||
|
* is 10 and others are all 100, so that max-capacity of queue "c1" is
|
||||||
|
* <2GB, 2core>,
|
||||||
|
* submit app1 to queue "c1" and launch am1(resource=<1GB, 1 core>) on nm1,
|
||||||
|
* submit app2 to queue "b" and launch am2(resource=<1GB, 1 core>) on nm1,
|
||||||
|
* app1 and app2 both ask one <2GB, 1core> containers
|
||||||
|
*
|
||||||
|
* Now queue "c" has lower capacity percentage than queue "b", the
|
||||||
|
* allocation sequence will be "a" -> "c" -> "b", queue "c1" has reached
|
||||||
|
* queue limit so that requests of app1 should be pending
|
||||||
|
*
|
||||||
|
* After nm1 do 1 heartbeat, scheduler should allocate one container for
|
||||||
|
* app2 on nm1.
|
||||||
|
*/
|
||||||
|
CapacitySchedulerConfiguration newConf =
|
||||||
|
(CapacitySchedulerConfiguration) TestUtils
|
||||||
|
.getConfigurationWithMultipleQueues(conf);
|
||||||
|
newConf.setQueues(CapacitySchedulerConfiguration.ROOT + ".c",
|
||||||
|
new String[] { "c1", "c2" });
|
||||||
|
newConf.setCapacity(CapacitySchedulerConfiguration.ROOT + ".c.c1", 10);
|
||||||
|
newConf
|
||||||
|
.setMaximumCapacity(CapacitySchedulerConfiguration.ROOT + ".c.c1", 10);
|
||||||
|
newConf.setCapacity(CapacitySchedulerConfiguration.ROOT + ".c.c2", 90);
|
||||||
|
newConf.setClass(CapacitySchedulerConfiguration.RESOURCE_CALCULATOR_CLASS,
|
||||||
|
DominantResourceCalculator.class, ResourceCalculator.class);
|
||||||
|
|
||||||
|
MockRM rm1 = new MockRM(newConf);
|
||||||
|
|
||||||
|
RMNodeLabelsManager nodeLabelsManager = new NullRMNodeLabelsManager();
|
||||||
|
nodeLabelsManager.init(newConf);
|
||||||
|
rm1.getRMContext().setNodeLabelManager(nodeLabelsManager);
|
||||||
|
rm1.start();
|
||||||
|
MockNM nm1 = rm1.registerNode("h1:1234", 10 * GB);
|
||||||
|
MockNM nm2 = rm1.registerNode("h2:1234", 10 * GB);
|
||||||
|
|
||||||
|
// launch an app to queue "c1", AM container should be launched on nm1
|
||||||
|
RMApp app1 = rm1.submitApp(1 * GB, "app", "user", null, "c1");
|
||||||
|
MockAM am1 = MockRM.launchAndRegisterAM(app1, rm1, nm1);
|
||||||
|
|
||||||
|
// launch another app to queue "b", AM container should be launched on nm1
|
||||||
|
RMApp app2 = rm1.submitApp(1 * GB, "app", "user", null, "b");
|
||||||
|
MockAM am2 = MockRM.launchAndRegisterAM(app2, rm1, nm1);
|
||||||
|
|
||||||
|
am1.allocate("*", 2 * GB, 1, new ArrayList<ContainerId>());
|
||||||
|
am2.allocate("*", 2 * GB, 1, new ArrayList<ContainerId>());
|
||||||
|
|
||||||
|
CapacityScheduler cs = (CapacityScheduler) rm1.getResourceScheduler();
|
||||||
|
RMNode rmNode1 = rm1.getRMContext().getRMNodes().get(nm1.getNodeId());
|
||||||
|
FiCaSchedulerApp schedulerApp1 =
|
||||||
|
cs.getApplicationAttempt(am1.getApplicationAttemptId());
|
||||||
|
FiCaSchedulerApp schedulerApp2 =
|
||||||
|
cs.getApplicationAttempt(am2.getApplicationAttemptId());
|
||||||
|
|
||||||
|
// Do nm1 heartbeats 1 times, will allocate a container on nm1 for app2
|
||||||
|
cs.handle(new NodeUpdateSchedulerEvent(rmNode1));
|
||||||
|
rm1.drainEvents();
|
||||||
|
Assert.assertEquals(1, schedulerApp1.getLiveContainers().size());
|
||||||
|
Assert.assertEquals(2, schedulerApp2.getLiveContainers().size());
|
||||||
|
|
||||||
|
rm1.close();
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue