YARN-5077. Fix FSLeafQueue#getFairShare() for queues with zero fairshare. (Yufei Gu via kasha)
(cherry picked from commit 20f2799938
)
This commit is contained in:
parent
e9c1155233
commit
25be8105cd
|
@ -43,7 +43,6 @@ import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ActiveUsersManager;
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ActiveUsersManager;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerAppUtils;
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerAppUtils;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplicationAttempt;
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplicationAttempt;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.policies.DominantResourceFairnessPolicy;
|
|
||||||
import org.apache.hadoop.yarn.util.resource.Resources;
|
import org.apache.hadoop.yarn.util.resource.Resources;
|
||||||
|
|
||||||
@Private
|
@Private
|
||||||
|
@ -482,8 +481,7 @@ public class FSLeafQueue extends FSQueue {
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Check whether this queue can run this application master under the
|
* Check whether this queue can run this application master under the
|
||||||
* maxAMShare limit. For FIFO and FAIR policies, check if the VCore usage
|
* maxAMShare limit.
|
||||||
* takes up the entire cluster or maxResources for the queue.
|
|
||||||
* @param amResource
|
* @param amResource
|
||||||
* @return true if this queue can run
|
* @return true if this queue can run
|
||||||
*/
|
*/
|
||||||
|
@ -493,24 +491,25 @@ public class FSLeafQueue extends FSQueue {
|
||||||
if (Math.abs(maxAMShare - -1.0f) < 0.0001) {
|
if (Math.abs(maxAMShare - -1.0f) < 0.0001) {
|
||||||
return true;
|
return true;
|
||||||
}
|
}
|
||||||
Resource maxAMResource = Resources.multiply(getFairShare(), maxAMShare);
|
|
||||||
Resource ifRunAMResource = Resources.add(amResourceUsage, amResource);
|
|
||||||
|
|
||||||
boolean overMaxAMShareLimit = policy
|
// If FairShare is zero, use min(maxShare, available resource) to compute
|
||||||
.checkIfAMResourceUsageOverLimit(ifRunAMResource, maxAMResource);
|
// maxAMResource
|
||||||
|
Resource maxResource = Resources.clone(getFairShare());
|
||||||
// For fair policy and fifo policy which doesn't check VCore usages,
|
if (maxResource.getMemorySize() == 0) {
|
||||||
// additionally check if the AM takes all available VCores or
|
maxResource.setMemory(
|
||||||
// over maxResource to avoid deadlock.
|
Math.min(scheduler.getRootQueueMetrics().getAvailableMB(),
|
||||||
if (!overMaxAMShareLimit && !policy.equals(
|
getMaxShare().getMemorySize()));
|
||||||
SchedulingPolicy.getInstance(DominantResourceFairnessPolicy.class))) {
|
|
||||||
overMaxAMShareLimit =
|
|
||||||
isVCoresOverMaxResource(ifRunAMResource.getVirtualCores()) ||
|
|
||||||
ifRunAMResource.getVirtualCores() >=
|
|
||||||
scheduler.getRootQueueMetrics().getAvailableVirtualCores();
|
|
||||||
}
|
}
|
||||||
|
|
||||||
return !overMaxAMShareLimit;
|
if (maxResource.getVirtualCoresSize() == 0) {
|
||||||
|
maxResource.setVirtualCores(Math.min(
|
||||||
|
scheduler.getRootQueueMetrics().getAvailableVirtualCores(),
|
||||||
|
getMaxShare().getVirtualCoresSize()));
|
||||||
|
}
|
||||||
|
|
||||||
|
Resource maxAMResource = Resources.multiply(maxResource, maxAMShare);
|
||||||
|
Resource ifRunAMResource = Resources.add(amResourceUsage, amResource);
|
||||||
|
return Resources.fitsIn(ifRunAMResource, maxAMResource);
|
||||||
}
|
}
|
||||||
|
|
||||||
public void addAMResourceUsage(Resource amResource) {
|
public void addAMResourceUsage(Resource amResource) {
|
||||||
|
|
|
@ -310,25 +310,6 @@ public abstract class FSQueue implements Queue, Schedulable {
|
||||||
return true;
|
return true;
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
|
||||||
* Helper method to check if requested VCores are over maxResource.
|
|
||||||
* @param requestedVCores the number of VCores requested
|
|
||||||
* @return true if the number of VCores requested is over the maxResource;
|
|
||||||
* false otherwise
|
|
||||||
*/
|
|
||||||
protected boolean isVCoresOverMaxResource(int requestedVCores) {
|
|
||||||
if (requestedVCores >= scheduler.getAllocationConfiguration().
|
|
||||||
getMaxResources(getName()).getVirtualCores()) {
|
|
||||||
return true;
|
|
||||||
}
|
|
||||||
|
|
||||||
if (getParent() == null) {
|
|
||||||
return false;
|
|
||||||
}
|
|
||||||
|
|
||||||
return getParent().isVCoresOverMaxResource(requestedVCores);
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Returns true if queue has at least one app running.
|
* Returns true if queue has at least one app running.
|
||||||
*/
|
*/
|
||||||
|
|
|
@ -177,17 +177,6 @@ public abstract class SchedulingPolicy {
|
||||||
public abstract boolean checkIfUsageOverFairShare(
|
public abstract boolean checkIfUsageOverFairShare(
|
||||||
Resource usage, Resource fairShare);
|
Resource usage, Resource fairShare);
|
||||||
|
|
||||||
/**
|
|
||||||
* Check if a leaf queue's AM resource usage over its limit under this policy
|
|
||||||
*
|
|
||||||
* @param usage {@link Resource} the resource used by application masters
|
|
||||||
* @param maxAMResource {@link Resource} the maximum allowed resource for
|
|
||||||
* application masters
|
|
||||||
* @return true if AM resource usage is over the limit
|
|
||||||
*/
|
|
||||||
public abstract boolean checkIfAMResourceUsageOverLimit(
|
|
||||||
Resource usage, Resource maxAMResource);
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Get headroom by calculating the min of <code>clusterAvailable</code> and
|
* Get headroom by calculating the min of <code>clusterAvailable</code> and
|
||||||
* (<code>queueFairShare</code> - <code>queueUsage</code>) resources that are
|
* (<code>queueFairShare</code> - <code>queueUsage</code>) resources that are
|
||||||
|
|
|
@ -93,11 +93,6 @@ public class DominantResourceFairnessPolicy extends SchedulingPolicy {
|
||||||
return !Resources.fitsIn(usage, fairShare);
|
return !Resources.fitsIn(usage, fairShare);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
|
||||||
public boolean checkIfAMResourceUsageOverLimit(Resource usage, Resource maxAMResource) {
|
|
||||||
return !Resources.fitsIn(usage, maxAMResource);
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public Resource getHeadroom(Resource queueFairShare, Resource queueUsage,
|
public Resource getHeadroom(Resource queueFairShare, Resource queueUsage,
|
||||||
Resource maxAvailable) {
|
Resource maxAvailable) {
|
||||||
|
|
|
@ -150,11 +150,6 @@ public class FairSharePolicy extends SchedulingPolicy {
|
||||||
return Resources.greaterThan(RESOURCE_CALCULATOR, null, usage, fairShare);
|
return Resources.greaterThan(RESOURCE_CALCULATOR, null, usage, fairShare);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
|
||||||
public boolean checkIfAMResourceUsageOverLimit(Resource usage, Resource maxAMResource) {
|
|
||||||
return usage.getMemorySize() > maxAMResource.getMemorySize();
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public byte getApplicableDepth() {
|
public byte getApplicableDepth() {
|
||||||
return SchedulingPolicy.DEPTH_ANY;
|
return SchedulingPolicy.DEPTH_ANY;
|
||||||
|
|
|
@ -113,11 +113,6 @@ public class FifoPolicy extends SchedulingPolicy {
|
||||||
"as FifoPolicy only works for FSLeafQueue.");
|
"as FifoPolicy only works for FSLeafQueue.");
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
|
||||||
public boolean checkIfAMResourceUsageOverLimit(Resource usage, Resource maxAMResource) {
|
|
||||||
return usage.getMemorySize() > maxAMResource.getMemorySize();
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public Resource getHeadroom(Resource queueFairShare,
|
public Resource getHeadroom(Resource queueFairShare,
|
||||||
Resource queueUsage, Resource maxAvailable) {
|
Resource queueUsage, Resource maxAvailable) {
|
||||||
|
|
|
@ -1081,6 +1081,84 @@ public class TestFairScheduler extends FairSchedulerTestBase {
|
||||||
getCurrentReservation().getMemorySize());
|
getCurrentReservation().getMemorySize());
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* The test verifies that zero-FairShare queues (because of zero/tiny
|
||||||
|
* weight) can get resources for the AM.
|
||||||
|
*/
|
||||||
|
@Test
|
||||||
|
public void testRequestAMResourceInZeroFairShareQueue() throws Exception {
|
||||||
|
conf.set(FairSchedulerConfiguration.ALLOCATION_FILE, ALLOC_FILE);
|
||||||
|
|
||||||
|
PrintWriter out = new PrintWriter(new FileWriter(ALLOC_FILE));
|
||||||
|
out.println("<?xml version=\"1.0\"?>");
|
||||||
|
out.println("<allocations>");
|
||||||
|
out.println("<queue name=\"queue1\">");
|
||||||
|
out.println("<weight>0.0</weight>");
|
||||||
|
out.println("<maxResources>4096mb,10vcores</maxResources>");
|
||||||
|
out.println("<maxAMShare>0.5</maxAMShare>");
|
||||||
|
out.println("</queue>");
|
||||||
|
out.println("<queue name=\"queue2\">");
|
||||||
|
out.println("<weight>2.0</weight>");
|
||||||
|
out.println("</queue>");
|
||||||
|
out.println("<queue name=\"queue3\">");
|
||||||
|
out.println("<weight>0.000001</weight>");
|
||||||
|
out.println("</queue>");
|
||||||
|
out.println("</allocations>");
|
||||||
|
out.close();
|
||||||
|
|
||||||
|
scheduler.init(conf);
|
||||||
|
scheduler.start();
|
||||||
|
scheduler.reinitialize(conf, resourceManager.getRMContext());
|
||||||
|
|
||||||
|
RMNode node =
|
||||||
|
MockNodes.newNodeInfo(1, Resources.createResource(8192, 20),
|
||||||
|
0, "127.0.0.1");
|
||||||
|
NodeAddedSchedulerEvent nodeEvent = new NodeAddedSchedulerEvent(node);
|
||||||
|
NodeUpdateSchedulerEvent updateEvent = new NodeUpdateSchedulerEvent(node);
|
||||||
|
scheduler.handle(nodeEvent);
|
||||||
|
scheduler.update();
|
||||||
|
|
||||||
|
//create request for non-zero weight queue
|
||||||
|
createSchedulingRequest(1024, "root.queue2", "user2", 1);
|
||||||
|
scheduler.update();
|
||||||
|
scheduler.handle(updateEvent);
|
||||||
|
|
||||||
|
// A managed AM which need 3G memory will not get resource,
|
||||||
|
// since it request more than the maxAMShare (4G * 0.5 = 2G).
|
||||||
|
Resource amResource = Resource.newInstance(1024, 1);
|
||||||
|
int amPriority = RMAppAttemptImpl.AM_CONTAINER_PRIORITY.getPriority();
|
||||||
|
ApplicationAttemptId attId1 = createAppAttemptId(1, 1);
|
||||||
|
createApplicationWithAMResource(attId1, "root.queue1", "user1", amResource);
|
||||||
|
createSchedulingRequestExistingApplication(3 * 1024, 1, amPriority, attId1);
|
||||||
|
FSAppAttempt app1 = scheduler.getSchedulerApp(attId1);
|
||||||
|
scheduler.update();
|
||||||
|
scheduler.handle(updateEvent);
|
||||||
|
assertEquals("Application 1 should not be running",
|
||||||
|
0, app1.getLiveContainers().size());
|
||||||
|
|
||||||
|
// A managed AM which need 2G memory will get resource,
|
||||||
|
// since it request no more than the maxAMShare (4G * 0.5 = 2G).
|
||||||
|
ApplicationAttemptId attId2 = createAppAttemptId(2, 1);
|
||||||
|
createApplicationWithAMResource(attId2, "root.queue1", "user1", amResource);
|
||||||
|
createSchedulingRequestExistingApplication(2 * 1024, 1, amPriority, attId2);
|
||||||
|
FSAppAttempt app2 = scheduler.getSchedulerApp(attId2);
|
||||||
|
scheduler.update();
|
||||||
|
scheduler.handle(updateEvent);
|
||||||
|
assertEquals("Application 2 should be running",
|
||||||
|
1, app2.getLiveContainers().size());
|
||||||
|
|
||||||
|
// A managed AM which need 1G memory will get resource, even thought its
|
||||||
|
// fair share is 0 because its weight is tiny(0.000001).
|
||||||
|
ApplicationAttemptId attId3 = createAppAttemptId(3, 1);
|
||||||
|
createApplicationWithAMResource(attId3, "root.queue3", "user1", amResource);
|
||||||
|
createSchedulingRequestExistingApplication(1024, 1, amPriority, attId3);
|
||||||
|
FSAppAttempt app3 = scheduler.getSchedulerApp(attId3);
|
||||||
|
scheduler.update();
|
||||||
|
scheduler.handle(updateEvent);
|
||||||
|
assertEquals("Application 3 should be running",
|
||||||
|
1, app3.getLiveContainers().size());
|
||||||
|
}
|
||||||
|
|
||||||
@Test (timeout = 500000)
|
@Test (timeout = 500000)
|
||||||
public void testContainerReservationNotExceedingQueueMax() throws Exception {
|
public void testContainerReservationNotExceedingQueueMax() throws Exception {
|
||||||
conf.set(FairSchedulerConfiguration.ALLOCATION_FILE, ALLOC_FILE);
|
conf.set(FairSchedulerConfiguration.ALLOCATION_FILE, ALLOC_FILE);
|
||||||
|
|
Loading…
Reference in New Issue