YARN-4866. FairScheduler: AMs can consume all vcores leading to a livelock when using FAIR policy. (Yufei Gu via kasha)
This commit is contained in:
parent
013532a95e
commit
4f513a4a8e
|
@ -43,6 +43,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ActiveUsersManager;
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ActiveUsersManager;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerAppUtils;
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerAppUtils;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplicationAttempt;
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplicationAttempt;
|
||||||
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.policies.DominantResourceFairnessPolicy;
|
||||||
import org.apache.hadoop.yarn.util.resource.Resources;
|
import org.apache.hadoop.yarn.util.resource.Resources;
|
||||||
|
|
||||||
@Private
|
@Private
|
||||||
|
@ -481,8 +482,8 @@ public class FSLeafQueue extends FSQueue {
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Check whether this queue can run this application master under the
|
* Check whether this queue can run this application master under the
|
||||||
* maxAMShare limit
|
* maxAMShare limit. For FIFO and FAIR policies, check if the VCore usage
|
||||||
*
|
* takes up the entire cluster or maxResources for the queue.
|
||||||
* @param amResource
|
* @param amResource
|
||||||
* @return true if this queue can run
|
* @return true if this queue can run
|
||||||
*/
|
*/
|
||||||
|
@ -494,8 +495,22 @@ public class FSLeafQueue extends FSQueue {
|
||||||
}
|
}
|
||||||
Resource maxAMResource = Resources.multiply(getFairShare(), maxAMShare);
|
Resource maxAMResource = Resources.multiply(getFairShare(), maxAMShare);
|
||||||
Resource ifRunAMResource = Resources.add(amResourceUsage, amResource);
|
Resource ifRunAMResource = Resources.add(amResourceUsage, amResource);
|
||||||
return !policy
|
|
||||||
.checkIfAMResourceUsageOverLimit(ifRunAMResource, maxAMResource);
|
boolean overMaxAMShareLimit = policy
|
||||||
|
.checkIfAMResourceUsageOverLimit(ifRunAMResource, maxAMResource);
|
||||||
|
|
||||||
|
// For fair policy and fifo policy which doesn't check VCore usages,
|
||||||
|
// additionally check if the AM takes all available VCores or
|
||||||
|
// over maxResource to avoid deadlock.
|
||||||
|
if (!overMaxAMShareLimit && !policy.equals(
|
||||||
|
SchedulingPolicy.getInstance(DominantResourceFairnessPolicy.class))) {
|
||||||
|
overMaxAMShareLimit =
|
||||||
|
isVCoresOverMaxResource(ifRunAMResource.getVirtualCores()) ||
|
||||||
|
ifRunAMResource.getVirtualCores() >=
|
||||||
|
scheduler.getRootQueueMetrics().getAvailableVirtualCores();
|
||||||
|
}
|
||||||
|
|
||||||
|
return !overMaxAMShareLimit;
|
||||||
}
|
}
|
||||||
|
|
||||||
public void addAMResourceUsage(Resource amResource) {
|
public void addAMResourceUsage(Resource amResource) {
|
||||||
|
|
|
@ -310,6 +310,25 @@ public abstract class FSQueue implements Queue, Schedulable {
|
||||||
return true;
|
return true;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Helper method to check if requested VCores are over maxResource.
|
||||||
|
* @param requestedVCores the number of VCores requested
|
||||||
|
* @return true if the number of VCores requested is over the maxResource;
|
||||||
|
* false otherwise
|
||||||
|
*/
|
||||||
|
protected boolean isVCoresOverMaxResource(int requestedVCores) {
|
||||||
|
if (requestedVCores >= scheduler.getAllocationConfiguration().
|
||||||
|
getMaxResources(getName()).getVirtualCores()) {
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (getParent() == null) {
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
|
||||||
|
return getParent().isVCoresOverMaxResource(requestedVCores);
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Returns true if queue has at least one app running.
|
* Returns true if queue has at least one app running.
|
||||||
*/
|
*/
|
||||||
|
|
|
@ -3287,6 +3287,7 @@ public class TestFairScheduler extends FairSchedulerTestBase {
|
||||||
@Test
|
@Test
|
||||||
public void testQueueMaxAMShareDefault() throws Exception {
|
public void testQueueMaxAMShareDefault() throws Exception {
|
||||||
conf.set(FairSchedulerConfiguration.ALLOCATION_FILE, ALLOC_FILE);
|
conf.set(FairSchedulerConfiguration.ALLOCATION_FILE, ALLOC_FILE);
|
||||||
|
conf.setInt(YarnConfiguration.RM_SCHEDULER_MAXIMUM_ALLOCATION_VCORES, 6);
|
||||||
|
|
||||||
PrintWriter out = new PrintWriter(new FileWriter(ALLOC_FILE));
|
PrintWriter out = new PrintWriter(new FileWriter(ALLOC_FILE));
|
||||||
out.println("<?xml version=\"1.0\"?>");
|
out.println("<?xml version=\"1.0\"?>");
|
||||||
|
@ -3297,11 +3298,14 @@ public class TestFairScheduler extends FairSchedulerTestBase {
|
||||||
out.println("<maxAMShare>0.4</maxAMShare>");
|
out.println("<maxAMShare>0.4</maxAMShare>");
|
||||||
out.println("</queue>");
|
out.println("</queue>");
|
||||||
out.println("<queue name=\"queue3\">");
|
out.println("<queue name=\"queue3\">");
|
||||||
|
out.println("<maxResources>10240 mb 4 vcores</maxResources>");
|
||||||
out.println("</queue>");
|
out.println("</queue>");
|
||||||
out.println("<queue name=\"queue4\">");
|
out.println("<queue name=\"queue4\">");
|
||||||
out.println("</queue>");
|
out.println("</queue>");
|
||||||
out.println("<queue name=\"queue5\">");
|
out.println("<queue name=\"queue5\">");
|
||||||
out.println("</queue>");
|
out.println("</queue>");
|
||||||
|
out.println(
|
||||||
|
"<defaultQueueSchedulingPolicy>fair</defaultQueueSchedulingPolicy>");
|
||||||
out.println("</allocations>");
|
out.println("</allocations>");
|
||||||
out.close();
|
out.close();
|
||||||
|
|
||||||
|
@ -3310,7 +3314,7 @@ public class TestFairScheduler extends FairSchedulerTestBase {
|
||||||
scheduler.reinitialize(conf, resourceManager.getRMContext());
|
scheduler.reinitialize(conf, resourceManager.getRMContext());
|
||||||
|
|
||||||
RMNode node =
|
RMNode node =
|
||||||
MockNodes.newNodeInfo(1, Resources.createResource(8192, 20),
|
MockNodes.newNodeInfo(1, Resources.createResource(8192, 10),
|
||||||
0, "127.0.0.1");
|
0, "127.0.0.1");
|
||||||
NodeAddedSchedulerEvent nodeEvent = new NodeAddedSchedulerEvent(node);
|
NodeAddedSchedulerEvent nodeEvent = new NodeAddedSchedulerEvent(node);
|
||||||
NodeUpdateSchedulerEvent updateEvent = new NodeUpdateSchedulerEvent(node);
|
NodeUpdateSchedulerEvent updateEvent = new NodeUpdateSchedulerEvent(node);
|
||||||
|
@ -3378,6 +3382,44 @@ public class TestFairScheduler extends FairSchedulerTestBase {
|
||||||
0, app2.getLiveContainers().size());
|
0, app2.getLiveContainers().size());
|
||||||
assertEquals("Queue2's AM resource usage should be 0 MB memory",
|
assertEquals("Queue2's AM resource usage should be 0 MB memory",
|
||||||
0, queue2.getAmResourceUsage().getMemory());
|
0, queue2.getAmResourceUsage().getMemory());
|
||||||
|
|
||||||
|
// Remove the app2
|
||||||
|
AppAttemptRemovedSchedulerEvent appRemovedEvent2 =
|
||||||
|
new AppAttemptRemovedSchedulerEvent(attId2,
|
||||||
|
RMAppAttemptState.FINISHED, false);
|
||||||
|
scheduler.handle(appRemovedEvent2);
|
||||||
|
scheduler.update();
|
||||||
|
|
||||||
|
// AM3 can pass the fair share checking, but it takes all available VCore,
|
||||||
|
// So the AM3 is not accepted.
|
||||||
|
ApplicationAttemptId attId3 = createAppAttemptId(3, 1);
|
||||||
|
createApplicationWithAMResource(attId3, "queue3", "test1", amResource1);
|
||||||
|
createSchedulingRequestExistingApplication(1024, 6, amPriority, attId3);
|
||||||
|
FSAppAttempt app3 = scheduler.getSchedulerApp(attId3);
|
||||||
|
scheduler.update();
|
||||||
|
scheduler.handle(updateEvent);
|
||||||
|
assertEquals("Application3's AM resource shouldn't be updated",
|
||||||
|
0, app3.getAMResource().getMemory());
|
||||||
|
assertEquals("Application3's AM should not be running",
|
||||||
|
0, app3.getLiveContainers().size());
|
||||||
|
assertEquals("Queue3's AM resource usage should be 0 MB memory",
|
||||||
|
0, queue3.getAmResourceUsage().getMemory());
|
||||||
|
|
||||||
|
// AM4 can pass the fair share checking and it doesn't takes all
|
||||||
|
// available VCore, but it need 5 VCores which are more than
|
||||||
|
// maxResources(4 VCores). So the AM4 is not accepted.
|
||||||
|
ApplicationAttemptId attId4 = createAppAttemptId(4, 1);
|
||||||
|
createApplicationWithAMResource(attId4, "queue3", "test1", amResource1);
|
||||||
|
createSchedulingRequestExistingApplication(1024, 5, amPriority, attId4);
|
||||||
|
FSAppAttempt app4 = scheduler.getSchedulerApp(attId4);
|
||||||
|
scheduler.update();
|
||||||
|
scheduler.handle(updateEvent);
|
||||||
|
assertEquals("Application4's AM resource shouldn't be updated",
|
||||||
|
0, app4.getAMResource().getMemory());
|
||||||
|
assertEquals("Application4's AM should not be running",
|
||||||
|
0, app4.getLiveContainers().size());
|
||||||
|
assertEquals("Queue3's AM resource usage should be 0 MB memory",
|
||||||
|
0, queue3.getAmResourceUsage().getMemory());
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
|
Loading…
Reference in New Issue