YARN-4280. CapacityScheduler reservations may not prevent indefinite postponement on a busy cluster. Contributed by Kuhu Shukla

This commit is contained in:
Jason Lowe 2016-08-03 19:01:14 +00:00
parent c322e749d6
commit aca7eea611
6 changed files with 359 additions and 27 deletions

View File

@ -32,35 +32,44 @@ public class CSAssignment {
public static final CSAssignment NULL_ASSIGNMENT =
new CSAssignment(Resources.createResource(0, 0), NodeType.NODE_LOCAL);
public static final CSAssignment SKIP_ASSIGNMENT = new CSAssignment(true);
public static final CSAssignment SKIP_ASSIGNMENT =
new CSAssignment(SkippedType.OTHER);
private Resource resource;
private NodeType type;
private RMContainer excessReservation;
private FiCaSchedulerApp application;
private final boolean skipped;
private SkippedType skipped;
/**
* Reason for the queue to get skipped.
*/
public enum SkippedType {
NONE,
QUEUE_LIMIT,
OTHER
}
private boolean fulfilledReservation;
private final AssignmentInformation assignmentInformation;
private boolean increaseAllocation;
public CSAssignment(Resource resource, NodeType type) {
this(resource, type, null, null, false, false);
this(resource, type, null, null, SkippedType.NONE, false);
}
public CSAssignment(FiCaSchedulerApp application,
RMContainer excessReservation) {
this(excessReservation.getContainer().getResource(), NodeType.NODE_LOCAL,
excessReservation, application, false, false);
excessReservation, application, SkippedType.NONE, false);
}
public CSAssignment(boolean skipped) {
public CSAssignment(SkippedType skipped) {
this(Resource.newInstance(0, 0), NodeType.NODE_LOCAL, null, null, skipped,
false);
}
public CSAssignment(Resource resource, NodeType type,
RMContainer excessReservation, FiCaSchedulerApp application,
boolean skipped, boolean fulfilledReservation) {
SkippedType skipped, boolean fulfilledReservation) {
this.resource = resource;
this.type = type;
this.excessReservation = excessReservation;
@ -102,10 +111,14 @@ public class CSAssignment {
excessReservation = rmContainer;
}
public boolean getSkipped() {
public SkippedType getSkippedType() {
return skipped;
}
public void setSkippedType(SkippedType skippedType) {
this.skipped = skippedType;
}
@Override
public String toString() {
String ret = "resource:" + resource.toString();

View File

@ -58,6 +58,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerEventType;
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerState;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ActiveUsersManager;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceLimits;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceUsage;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedContainerChangeRequest;
@ -931,8 +932,12 @@ public class LeafQueue extends AbstractCSQueue {
// Done
return assignment;
} else if (assignment.getSkipped()) {
} else if (assignment.getSkippedType()
== CSAssignment.SkippedType.OTHER) {
application.updateNodeInfoForAMDiagnostics(node);
} else if(assignment.getSkippedType()
== CSAssignment.SkippedType.QUEUE_LIMIT) {
return assignment;
} else {
// If we don't allocate anything, and it is not skipped by application,
// we will return to respect FIFO of applications

View File

@ -467,6 +467,7 @@ public class ParentQueue extends AbstractCSQueue {
" cluster=" + clusterResource);
} else {
assignment.setSkippedType(assignedToChild.getSkippedType());
break;
}
@ -500,13 +501,13 @@ public class ParentQueue extends AbstractCSQueue {
}
private ResourceLimits getResourceLimitsOfChild(CSQueue child,
Resource clusterResource, ResourceLimits parentLimits) {
Resource clusterResource, Resource parentLimits) {
// Set resource-limit of a given child, child.limit =
// min(my.limit - my.used + child.used, child.max)
// Parent available resource = parent-limit - parent-used-resource
Resource parentMaxAvailableResource =
Resources.subtract(parentLimits.getLimit(), getUsedResources());
Resources.subtract(parentLimits, getUsedResources());
// Child's limit = parent-available-resource + child-used
Resource childLimit =
@ -552,9 +553,9 @@ public class ParentQueue extends AbstractCSQueue {
private synchronized CSAssignment assignContainersToChildQueues(
Resource cluster, FiCaSchedulerNode node, ResourceLimits limits,
SchedulingMode schedulingMode) {
CSAssignment assignment =
new CSAssignment(Resources.createResource(0, 0), NodeType.NODE_LOCAL);
CSAssignment assignment = CSAssignment.NULL_ASSIGNMENT;
Resource parentLimits = limits.getLimit();
printChildQueues();
// Try to assign to most 'under-served' sub-queue
@ -568,20 +569,20 @@ public class ParentQueue extends AbstractCSQueue {
// Get ResourceLimits of child queue before assign containers
ResourceLimits childLimits =
getResourceLimitsOfChild(childQueue, cluster, limits);
getResourceLimitsOfChild(childQueue, cluster, parentLimits);
assignment = childQueue.assignContainers(cluster, node,
CSAssignment childAssignment = childQueue.assignContainers(cluster, node,
childLimits, schedulingMode);
if(LOG.isDebugEnabled()) {
LOG.debug("Assigned to queue: " + childQueue.getQueuePath() +
" stats: " + childQueue + " --> " +
assignment.getResource() + ", " + assignment.getType());
" stats: " + childQueue + " --> " +
childAssignment.getResource() + ", " + childAssignment.getType());
}
// If we do assign, remove the queue and re-insert in-order to re-sort
if (Resources.greaterThan(
resourceCalculator, cluster,
assignment.getResource(), Resources.none())) {
childAssignment.getResource(), Resources.none())) {
// Only update childQueues when we doing non-partitioned node
// allocation.
if (RMNodeLabelsManager.NO_LABEL.equals(node.getPartition())) {
@ -594,7 +595,24 @@ public class ParentQueue extends AbstractCSQueue {
printChildQueues();
}
}
assignment = childAssignment;
break;
} else if (childAssignment.getSkippedType() ==
CSAssignment.SkippedType.QUEUE_LIMIT) {
if (assignment.getSkippedType() !=
CSAssignment.SkippedType.QUEUE_LIMIT) {
assignment = childAssignment;
}
Resource resourceToSubtract = Resources.max(resourceCalculator,
cluster, childLimits.getHeadroom(), Resources.none());
if(LOG.isDebugEnabled()) {
LOG.debug("Decrease parentLimits " + parentLimits +
" for " + this.getQueueName() + " by " +
resourceToSubtract + " as childQueue=" +
childQueue.getQueueName() + " is blocked");
}
parentLimits = Resources.subtract(parentLimits,
resourceToSubtract);
}
}
@ -715,7 +733,8 @@ public class ParentQueue extends AbstractCSQueue {
for (CSQueue childQueue : childQueues) {
// Get ResourceLimits of child queue before assign containers
ResourceLimits childLimits =
getResourceLimitsOfChild(childQueue, clusterResource, resourceLimits);
getResourceLimitsOfChild(childQueue, clusterResource,
resourceLimits.getLimit());
childQueue.updateClusterResource(clusterResource, childLimits);
}

View File

@ -55,8 +55,10 @@ public abstract class AbstractContainerAllocator {
Resource clusterResource, ContainerAllocation result,
RMContainer rmContainer) {
// Handle skipped
boolean skipped =
(result.getAllocationState() == AllocationState.APP_SKIPPED);
CSAssignment.SkippedType skipped =
(result.getAllocationState() == AllocationState.APP_SKIPPED) ?
CSAssignment.SkippedType.OTHER :
CSAssignment.SkippedType.NONE;
CSAssignment assignment = new CSAssignment(skipped);
assignment.setApplication(application);
@ -108,6 +110,11 @@ public abstract class AbstractContainerAllocator {
assignment.setFulfilledReservation(true);
}
}
} else {
if (result.getAllocationState() == AllocationState.QUEUE_SKIPPED) {
assignment.setSkippedType(
CSAssignment.SkippedType.QUEUE_LIMIT);
}
}
return assignment;

View File

@ -70,7 +70,7 @@ public class IncreaseContainerAllocator extends AbstractContainerAllocator {
SchedContainerChangeRequest request) {
CSAssignment assignment =
new CSAssignment(request.getDeltaCapacity(), NodeType.NODE_LOCAL, null,
application, false, false);
application, CSAssignment.SkippedType.NONE, false);
Resources.addTo(assignment.getAssignmentInformation().getReserved(),
request.getDeltaCapacity());
assignment.getAssignmentInformation().incrReservations();
@ -87,7 +87,7 @@ public class IncreaseContainerAllocator extends AbstractContainerAllocator {
SchedContainerChangeRequest request, boolean fromReservation) {
CSAssignment assignment =
new CSAssignment(request.getDeltaCapacity(), NodeType.NODE_LOCAL, null,
application, false, fromReservation);
application, CSAssignment.SkippedType.NONE, fromReservation);
Resources.addTo(assignment.getAssignmentInformation().getAllocated(),
request.getDeltaCapacity());
assignment.getAssignmentInformation().incrAllocations();
@ -308,7 +308,8 @@ public class IncreaseContainerAllocator extends AbstractContainerAllocator {
// Try to allocate the increase request
assigned =
allocateIncreaseRequest(node, clusterResource, increaseRequest);
if (!assigned.getSkipped()) {
if (assigned.getSkippedType()
== CSAssignment.SkippedType.NONE) {
// When we don't skip this request, which means we either allocated
// OR reserved this request. We will break
break;
@ -324,7 +325,8 @@ public class IncreaseContainerAllocator extends AbstractContainerAllocator {
}
// We may have allocated something
if (assigned != null && !assigned.getSkipped()) {
if (assigned != null && assigned.getSkippedType()
== CSAssignment.SkippedType.NONE) {
break;
}
}

View File

@ -113,6 +113,8 @@ import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeResourceUpdateEvent;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.AbstractYarnScheduler;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.Allocation;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.
ContainerExpiredSchedulerEvent;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplication;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplicationAttempt;
@ -164,6 +166,12 @@ public class TestCapacityScheduler {
private static final String B3 = B + ".b3";
private static float A_CAPACITY = 10.5f;
private static float B_CAPACITY = 89.5f;
private static final String P1 = CapacitySchedulerConfiguration.ROOT + ".p1";
private static final String P2 = CapacitySchedulerConfiguration.ROOT + ".p2";
private static final String X1 = P1 + ".x1";
private static final String X2 = P1 + ".x2";
private static final String Y1 = P2 + ".y1";
private static final String Y2 = P2 + ".y2";
private static float A1_CAPACITY = 30;
private static float A2_CAPACITY = 70;
private static float B1_CAPACITY = 79.2f;
@ -410,6 +418,51 @@ public class TestCapacityScheduler {
return conf;
}
private CapacitySchedulerConfiguration setupBlockedQueueConfiguration(
CapacitySchedulerConfiguration conf) {
// Define top-level queues
conf.setQueues(CapacitySchedulerConfiguration.ROOT,
new String[]{"a", "b"});
conf.setCapacity(A, 80f);
conf.setCapacity(B, 20f);
conf.setUserLimitFactor(A, 100);
conf.setUserLimitFactor(B, 100);
conf.setMaximumCapacity(A, 100);
conf.setMaximumCapacity(B, 100);
LOG.info("Setup top-level queues a and b");
return conf;
}
private CapacitySchedulerConfiguration setupOtherBlockedQueueConfiguration(
CapacitySchedulerConfiguration conf) {
// Define top-level queues
conf.setQueues(CapacitySchedulerConfiguration.ROOT,
new String[]{"p1", "p2"});
conf.setCapacity(P1, 50f);
conf.setMaximumCapacity(P1, 50f);
conf.setCapacity(P2, 50f);
conf.setMaximumCapacity(P2, 100f);
// Define 2nd-level queues
conf.setQueues(P1, new String[] {"x1", "x2"});
conf.setCapacity(X1, 80f);
conf.setMaximumCapacity(X1, 100f);
conf.setUserLimitFactor(X1, 2f);
conf.setCapacity(X2, 20f);
conf.setMaximumCapacity(X2, 100f);
conf.setUserLimitFactor(X2, 2f);
conf.setQueues(P2, new String[]{"y1", "y2"});
conf.setCapacity(Y1, 80f);
conf.setUserLimitFactor(Y1, 2f);
conf.setCapacity(Y2, 20f);
conf.setUserLimitFactor(Y2, 2f);
return conf;
}
@Test
public void testMaximumCapacitySetup() {
float delta = 0.0000001f;
@ -3374,4 +3427,237 @@ public class TestCapacityScheduler {
Assert.assertEquals(availableResource.getVirtualCores(), 0);
}
@Test
public void testCSReservationWithRootUnblocked() throws Exception {
CapacitySchedulerConfiguration conf = new CapacitySchedulerConfiguration();
conf.setResourceComparator(DominantResourceCalculator.class);
setupOtherBlockedQueueConfiguration(conf);
conf.setClass(YarnConfiguration.RM_SCHEDULER, CapacityScheduler.class,
ResourceScheduler.class);
MockRM rm = new MockRM(conf);
rm.start();
CapacityScheduler cs = (CapacityScheduler) rm.getResourceScheduler();
ParentQueue q = (ParentQueue) cs.getQueue("p1");
Assert.assertNotNull(q);
String host = "127.0.0.1";
String host1 = "test";
RMNode node =
MockNodes.newNodeInfo(0, Resource.newInstance(8 * GB, 8), 1, host);
RMNode node1 =
MockNodes.newNodeInfo(0, Resource.newInstance(8 * GB, 8), 2, host1);
cs.handle(new NodeAddedSchedulerEvent(node));
cs.handle(new NodeAddedSchedulerEvent(node1));
ApplicationAttemptId appAttemptId1 =
appHelper(rm, cs, 100, 1, "x1", "userX1");
ApplicationAttemptId appAttemptId2 =
appHelper(rm, cs, 100, 2, "x2", "userX2");
ApplicationAttemptId appAttemptId3 =
appHelper(rm, cs, 100, 3, "y1", "userY1");
RecordFactory recordFactory =
RecordFactoryProvider.getRecordFactory(null);
Priority priority = TestUtils.createMockPriority(1);
ResourceRequest y1Req = null;
ResourceRequest x1Req = null;
ResourceRequest x2Req = null;
for(int i=0; i < 4; i++) {
y1Req = TestUtils.createResourceRequest(
ResourceRequest.ANY, 1 * GB, 1, true, priority, recordFactory);
cs.allocate(appAttemptId3,
Collections.<ResourceRequest>singletonList(y1Req),
Collections.<ContainerId>emptyList(),
null, null, null, null);
CapacityScheduler.schedule(cs);
}
assertEquals("Y1 Used Resource should be 4 GB", 4 * GB,
cs.getQueue("y1").getUsedResources().getMemorySize());
assertEquals("P2 Used Resource should be 4 GB", 4 * GB,
cs.getQueue("p2").getUsedResources().getMemorySize());
for(int i=0; i < 7; i++) {
x1Req = TestUtils.createResourceRequest(
ResourceRequest.ANY, 1 * GB, 1, true, priority, recordFactory);
cs.allocate(appAttemptId1,
Collections.<ResourceRequest>singletonList(x1Req),
Collections.<ContainerId>emptyList(),
null, null, null, null);
CapacityScheduler.schedule(cs);
}
assertEquals("X1 Used Resource should be 7 GB", 7 * GB,
cs.getQueue("x1").getUsedResources().getMemorySize());
assertEquals("P1 Used Resource should be 7 GB", 7 * GB,
cs.getQueue("p1").getUsedResources().getMemorySize());
x2Req = TestUtils.createResourceRequest(
ResourceRequest.ANY, 2 * GB, 1, true, priority, recordFactory);
cs.allocate(appAttemptId2,
Collections.<ResourceRequest>singletonList(x2Req),
Collections.<ContainerId>emptyList(),
null, null, null, null);
CapacityScheduler.schedule(cs);
assertEquals("X2 Used Resource should be 0", 0,
cs.getQueue("x2").getUsedResources().getMemorySize());
assertEquals("P1 Used Resource should be 7 GB", 7 * GB,
cs.getQueue("p1").getUsedResources().getMemorySize());
//this assign should fail
x1Req = TestUtils.createResourceRequest(
ResourceRequest.ANY, 1 * GB, 1, true, priority, recordFactory);
cs.allocate(appAttemptId1,
Collections.<ResourceRequest>singletonList(x1Req),
Collections.<ContainerId>emptyList(),
null, null, null, null);
CapacityScheduler.schedule(cs);
assertEquals("X1 Used Resource should be 7 GB", 7 * GB,
cs.getQueue("x1").getUsedResources().getMemorySize());
assertEquals("P1 Used Resource should be 7 GB", 7 * GB,
cs.getQueue("p1").getUsedResources().getMemorySize());
//this should get thru
for (int i=0; i < 4; i++) {
y1Req = TestUtils.createResourceRequest(
ResourceRequest.ANY, 1 * GB, 1, true, priority, recordFactory);
cs.allocate(appAttemptId3,
Collections.<ResourceRequest>singletonList(y1Req),
Collections.<ContainerId>emptyList(),
null, null, null, null);
CapacityScheduler.schedule(cs);
}
assertEquals("P2 Used Resource should be 8 GB", 8 * GB,
cs.getQueue("p2").getUsedResources().getMemorySize());
//Free a container from X1
ContainerId containerId = ContainerId.newContainerId(appAttemptId1, 2);
cs.handle(new ContainerExpiredSchedulerEvent(containerId));
//Schedule pending request
CapacityScheduler.schedule(cs);
assertEquals("X2 Used Resource should be 2 GB", 2 * GB,
cs.getQueue("x2").getUsedResources().getMemorySize());
assertEquals("P1 Used Resource should be 8 GB", 8 * GB,
cs.getQueue("p1").getUsedResources().getMemorySize());
assertEquals("P2 Used Resource should be 8 GB", 8 * GB,
cs.getQueue("p2").getUsedResources().getMemorySize());
assertEquals("Root Used Resource should be 16 GB", 16 * GB,
cs.getRootQueue().getUsedResources().getMemorySize());
rm.stop();
}
@Test
public void testCSQueueBlocked() throws Exception {
CapacitySchedulerConfiguration conf = new CapacitySchedulerConfiguration();
setupBlockedQueueConfiguration(conf);
conf.setClass(YarnConfiguration.RM_SCHEDULER, CapacityScheduler.class,
ResourceScheduler.class);
MockRM rm = new MockRM(conf);
rm.start();
CapacityScheduler cs = (CapacityScheduler) rm.getResourceScheduler();
LeafQueue q = (LeafQueue) cs.getQueue("a");
Assert.assertNotNull(q);
String host = "127.0.0.1";
String host1 = "test";
RMNode node =
MockNodes.newNodeInfo(0, Resource.newInstance(8 * GB, 8), 1, host);
RMNode node1 =
MockNodes.newNodeInfo(0, Resource.newInstance(8 * GB, 8), 2, host1);
cs.handle(new NodeAddedSchedulerEvent(node));
cs.handle(new NodeAddedSchedulerEvent(node1));
//add app begin
ApplicationAttemptId appAttemptId1 =
appHelper(rm, cs, 100, 1, "a", "user1");
ApplicationAttemptId appAttemptId2 =
appHelper(rm, cs, 100, 2, "b", "user2");
//add app end
RecordFactory recordFactory =
RecordFactoryProvider.getRecordFactory(null);
Priority priority = TestUtils.createMockPriority(1);
ResourceRequest r1 = TestUtils.createResourceRequest(
ResourceRequest.ANY, 2 * GB, 1, true, priority, recordFactory);
//This will allocate for app1
cs.allocate(appAttemptId1, Collections.<ResourceRequest>singletonList(r1),
Collections.<ContainerId>emptyList(),
null, null, null, null).getContainers().size();
CapacityScheduler.schedule(cs);
ResourceRequest r2 = null;
for (int i =0; i < 13; i++) {
r2 = TestUtils.createResourceRequest(
ResourceRequest.ANY, 1 * GB, 1, true, priority, recordFactory);
cs.allocate(appAttemptId2,
Collections.<ResourceRequest>singletonList(r2),
Collections.<ContainerId>emptyList(),
null, null, null, null);
CapacityScheduler.schedule(cs);
}
assertEquals("A Used Resource should be 2 GB", 2 * GB,
cs.getQueue("a").getUsedResources().getMemorySize());
assertEquals("B Used Resource should be 2 GB", 13 * GB,
cs.getQueue("b").getUsedResources().getMemorySize());
r1 = TestUtils.createResourceRequest(
ResourceRequest.ANY, 2 * GB, 1, true, priority, recordFactory);
r2 = TestUtils.createResourceRequest(
ResourceRequest.ANY, 1 * GB, 1, true, priority, recordFactory);
cs.allocate(appAttemptId1, Collections.<ResourceRequest>singletonList(r1),
Collections.<ContainerId>emptyList(),
null, null, null, null).getContainers().size();
CapacityScheduler.schedule(cs);
cs.allocate(appAttemptId2, Collections.<ResourceRequest>singletonList(r2),
Collections.<ContainerId>emptyList(), null, null, null, null);
CapacityScheduler.schedule(cs);
//Check blocked Resource
assertEquals("A Used Resource should be 2 GB", 2 * GB,
cs.getQueue("a").getUsedResources().getMemorySize());
assertEquals("B Used Resource should be 13 GB", 13 * GB,
cs.getQueue("b").getUsedResources().getMemorySize());
ContainerId containerId1 = ContainerId.newContainerId(appAttemptId2, 10);
ContainerId containerId2 =ContainerId.newContainerId(appAttemptId2, 11);
cs.handle(new ContainerExpiredSchedulerEvent(containerId1));
cs.handle(new ContainerExpiredSchedulerEvent(containerId2));
CapacityScheduler.schedule(cs);
rm.drainEvents();
assertEquals("A Used Resource should be 2 GB", 4 * GB,
cs.getQueue("a").getUsedResources().getMemorySize());
assertEquals("B Used Resource should be 12 GB", 12 * GB,
cs.getQueue("b").getUsedResources().getMemorySize());
assertEquals("Used Resource on Root should be 16 GB", 16 * GB,
cs.getRootQueue().getUsedResources().getMemorySize());
rm.stop();
}
private ApplicationAttemptId appHelper(MockRM rm, CapacityScheduler cs,
int clusterTs, int appId, String queue,
String user) {
ApplicationId appId1 = BuilderUtils.newApplicationId(clusterTs, appId);
ApplicationAttemptId appAttemptId1 = BuilderUtils.newApplicationAttemptId(
appId1, appId);
RMAppAttemptMetrics attemptMetric1 =
new RMAppAttemptMetrics(appAttemptId1, rm.getRMContext());
RMAppImpl app1 = mock(RMAppImpl.class);
when(app1.getApplicationId()).thenReturn(appId1);
RMAppAttemptImpl attempt1 = mock(RMAppAttemptImpl.class);
Container container = mock(Container.class);
when(attempt1.getMasterContainer()).thenReturn(container);
ApplicationSubmissionContext submissionContext = mock(
ApplicationSubmissionContext.class);
when(attempt1.getSubmissionContext()).thenReturn(submissionContext);
when(attempt1.getAppAttemptId()).thenReturn(appAttemptId1);
when(attempt1.getRMAppAttemptMetrics()).thenReturn(attemptMetric1);
when(app1.getCurrentAppAttempt()).thenReturn(attempt1);
rm.getRMContext().getRMApps().put(appId1, app1);
SchedulerEvent addAppEvent1 =
new AppAddedSchedulerEvent(appId1, queue, user);
cs.handle(addAppEvent1);
SchedulerEvent addAttemptEvent1 =
new AppAttemptAddedSchedulerEvent(appAttemptId1, false);
cs.handle(addAttemptEvent1);
return appAttemptId1;
}
}