YARN-4865. Track Reserved resources in ResourceUsage and QueueCapacities. (Sunil G via wangda)

This commit is contained in:
Wangda Tan 2016-03-29 17:07:55 -07:00 committed by Eric Payne
parent 3b80424d4f
commit f9ef3e3719
10 changed files with 220 additions and 5 deletions

View File

@ -118,4 +118,24 @@ public interface Queue {
* @return default application priority
*/
public Priority getDefaultApplicationPriority();
/**
* Increment Reserved Capacity
*
* @param partition
* asked by application
* @param reservedRes
* reserved resource asked
*/
public void incReservedResource(String partition, Resource reservedRes);
/**
* Decrement Reserved Capacity
*
* @param partition
* asked by application
* @param reservedRes
* reserved resource asked
*/
public void decReservedResource(String partition, Resource reservedRes);
}

View File

@ -537,6 +537,30 @@ public abstract class AbstractCSQueue implements CSQueue {
return true;
}
@Override
public void incReservedResource(String partition, Resource reservedRes) {
if (partition == null) {
partition = RMNodeLabelsManager.NO_LABEL;
}
queueUsage.incReserved(partition, reservedRes);
if(null != parent){
parent.incReservedResource(partition, reservedRes);
}
}
@Override
public void decReservedResource(String partition, Resource reservedRes) {
if (partition == null) {
partition = RMNodeLabelsManager.NO_LABEL;
}
queueUsage.decReserved(partition, reservedRes);
if(null != parent){
parent.decReservedResource(partition, reservedRes);
}
}
@Override
public void incPendingResource(String nodeLabel, Resource resourceToInc) {
if (nodeLabel == null) {

View File

@ -186,6 +186,8 @@ class CSQueueUtils {
String nodePartition) {
float absoluteUsedCapacity = 0.0f;
float usedCapacity = 0.0f;
float reservedCapacity = 0.0f;
float absoluteReservedCapacity = 0.0f;
if (Resources.greaterThan(rc, totalPartitionResource,
totalPartitionResource, Resources.none())) {
@ -207,11 +209,22 @@ class CSQueueUtils {
usedCapacity =
Resources.divide(rc, totalPartitionResource, usedResource,
queueGuranteedResource);
Resource resResource = queueResourceUsage.getReserved(nodePartition);
reservedCapacity =
Resources.divide(rc, totalPartitionResource, resResource,
queueGuranteedResource);
absoluteReservedCapacity =
Resources.divide(rc, totalPartitionResource, resResource,
totalPartitionResource);
}
queueCapacities
.setAbsoluteUsedCapacity(nodePartition, absoluteUsedCapacity);
queueCapacities.setUsedCapacity(nodePartition, usedCapacity);
queueCapacities.setReservedCapacity(nodePartition, reservedCapacity);
queueCapacities
.setAbsoluteReservedCapacity(nodePartition, absoluteReservedCapacity);
}
private static Resource getMaxAvailableResourceToQueue(

View File

@ -994,6 +994,13 @@ public class LeafQueue extends AbstractCSQueue {
node.getPartition(), reservedOrAllocatedRMContainer,
assignment.isIncreasedAllocation());
// Update reserved metrics
Resource reservedRes = assignment.getAssignmentInformation()
.getReserved();
if (reservedRes != null && !reservedRes.equals(Resources.none())) {
incReservedResource(node.getPartition(), reservedRes);
}
// Done
return assignment;
} else if (assignment.getSkippedType()
@ -1434,6 +1441,13 @@ public class LeafQueue extends AbstractCSQueue {
// Book-keeping
if (removed) {
// track reserved resource for metrics, for normal container
// getReservedResource will be null.
Resource reservedRes = rmContainer.getReservedResource();
if (reservedRes != null && !reservedRes.equals(Resources.none())) {
decReservedResource(node.getPartition(), reservedRes);
}
// Inform the ordering policy
orderingPolicy.containerReleased(application, rmContainer);

View File

@ -50,7 +50,7 @@ public class QueueCapacities {
// Usage enum here to make implement cleaner
private enum CapacityType {
USED_CAP(0), ABS_USED_CAP(1), MAX_CAP(2), ABS_MAX_CAP(3), CAP(4), ABS_CAP(5),
MAX_AM_PERC(6);
MAX_AM_PERC(6), RESERVED_CAP(7), ABS_RESERVED_CAP(8);
private int idx;
@ -76,6 +76,8 @@ public class QueueCapacities {
sb.append("cap=" + capacitiesArr[4] + "%, ");
sb.append("abs_cap=" + capacitiesArr[5] + "%}");
sb.append("max_am_perc=" + capacitiesArr[6] + "%}");
sb.append("reserved_cap=" + capacitiesArr[7] + "%}");
sb.append("abs_reserved_cap=" + capacitiesArr[8] + "%}");
return sb.toString();
}
}
@ -234,6 +236,40 @@ public class QueueCapacities {
_set(NL, CapacityType.MAX_AM_PERC, value);
}
/* Reserved Capacity Getter and Setter */
public float getReservedCapacity() {
return _get(NL, CapacityType.RESERVED_CAP);
}
public float getReservedCapacity(String label) {
return _get(label, CapacityType.RESERVED_CAP);
}
public void setReservedCapacity(float value) {
_set(NL, CapacityType.RESERVED_CAP, value);
}
public void setReservedCapacity(String label, float value) {
_set(label, CapacityType.RESERVED_CAP, value);
}
/* Absolute Reserved Capacity Getter and Setter */
public float getAbsoluteReservedCapacity() {
return _get(NL, CapacityType.ABS_RESERVED_CAP);
}
public float getAbsoluteReservedCapacity(String label) {
return _get(label, CapacityType.ABS_RESERVED_CAP);
}
public void setAbsoluteReservedCapacity(float value) {
_set(NL, CapacityType.ABS_RESERVED_CAP, value);
}
public void setAbsoluteReservedCapacity(String label, float value) {
_set(label, CapacityType.ABS_RESERVED_CAP, value);
}
/**
* Clear configurable fields, like
* (absolute)capacity/(absolute)maximum-capacity, this will be used by queue

View File

@ -331,6 +331,14 @@ public abstract class FSQueue implements Queue, Schedulable {
public void decPendingResource(String nodeLabel, Resource resourceToDec) {
}
@Override
public void incReservedResource(String nodeLabel, Resource resourceToInc) {
}
@Override
public void decReservedResource(String nodeLabel, Resource resourceToDec) {
}
@Override
public Priority getDefaultApplicationPriority() {
// TODO add implementation for FSParentQueue

View File

@ -219,6 +219,18 @@ public class FifoScheduler extends
// TODO add implementation for FIFO scheduler
return null;
}
@Override
public void incReservedResource(String partition, Resource reservedRes) {
// TODO add implementation for FIFO scheduler
}
@Override
public void decReservedResource(String partition, Resource reservedRes) {
// TODO add implementation for FIFO scheduler
}
};
public FifoScheduler() {

View File

@ -368,6 +368,7 @@ public class TestContainerAllocation {
CapacityScheduler cs = (CapacityScheduler) rm1.getResourceScheduler();
RMNode rmNode1 = rm1.getRMContext().getRMNodes().get(nm1.getNodeId());
LeafQueue leafQueue = (LeafQueue) cs.getQueue("default");
// Do node heartbeats 2 times
// First time will allocate container for app1, second time will reserve
@ -394,6 +395,10 @@ public class TestContainerAllocation {
// Usage of queue = 4G + 2 * 1G + 4G (reserved)
Assert.assertEquals(10 * GB, cs.getRootQueue().getQueueResourceUsage()
.getUsed().getMemorySize());
Assert.assertEquals(4 * GB, cs.getRootQueue().getQueueResourceUsage()
.getReserved().getMemorySize());
Assert.assertEquals(4 * GB, leafQueue.getQueueResourceUsage().getReserved()
.getMemorySize());
// Cancel asks of app2 and re-kick RM
am2.allocate("*", 4 * GB, 0, new ArrayList<ContainerId>());
@ -406,6 +411,10 @@ public class TestContainerAllocation {
Assert.assertNull(cs.getNode(nm1.getNodeId()).getReservedContainer());
Assert.assertEquals(6 * GB, cs.getRootQueue().getQueueResourceUsage()
.getUsed().getMemorySize());
Assert.assertEquals(0, cs.getRootQueue().getQueueResourceUsage()
.getReserved().getMemorySize());
Assert.assertEquals(0, leafQueue.getQueueResourceUsage().getReserved()
.getMemorySize());
rm1.close();
}

View File

@ -466,6 +466,83 @@ public class TestNodeLabelContainerAllocation {
rm1.close();
}
@Test (timeout = 120000)
public void testContainerReservationWithLabels() throws Exception {
// This test is pretty much similar to testContainerAllocateWithLabel.
// Difference is, this test doesn't specify label expression in
// ResourceRequest,
// instead, it uses default queue label expression
// set node -> label
mgr.addToCluserNodeLabelsWithDefaultExclusivity(ImmutableSet.of("x", "y",
"z"));
mgr.addLabelsToNode(ImmutableMap.of(NodeId.newInstance("h1", 0),
toSet("x"), NodeId.newInstance("h2", 0), toSet("y"),
NodeId.newInstance("h3", 0), toSet("x")));
// inject node label manager
MockRM rm1 = new MockRM(
TestUtils.getConfigurationWithDefaultQueueLabels(conf)) {
@Override
public RMNodeLabelsManager createNodeLabelManager() {
return mgr;
}
};
rm1.getRMContext().setNodeLabelManager(mgr);
rm1.start();
MockNM nm1 = rm1.registerNode("h1:1234", 8 * GB); // label = x
rm1.registerNode("h2:1234", 8 * GB); // label = y
rm1.registerNode("h3:1234", 8 * GB); // label = x
ContainerId containerId;
// launch an app to queue a1 (label = x), and check all container will
// be allocated in h1
RMApp app1 = rm1.submitApp(1 * GB, "app", "user", null, "a1");
MockAM am1 = MockRM.launchAndRegisterAM(app1, rm1, nm1);
// request a container.
am1.allocate("*", 4 * GB, 2, new ArrayList<ContainerId>());
containerId = ContainerId.newContainerId(am1.getApplicationAttemptId(), 2);
CapacityScheduler cs = (CapacityScheduler) rm1.getResourceScheduler();
RMNode rmNode1 = rm1.getRMContext().getRMNodes().get(nm1.getNodeId());
LeafQueue leafQueue = (LeafQueue) cs.getQueue("a1");
// Do node heartbeats 2 times
// First time will allocate container for app1, second time will reserve
// container for app1
cs.handle(new NodeUpdateSchedulerEvent(rmNode1));
cs.handle(new NodeUpdateSchedulerEvent(rmNode1));
checkTaskContainersHost(am1.getApplicationAttemptId(), containerId, rm1,
"h1");
// Check if a 4G container allocated for app1, and 4G is reserved
FiCaSchedulerApp schedulerApp1 = cs.getApplicationAttempt(am1
.getApplicationAttemptId());
Assert.assertEquals(2, schedulerApp1.getLiveContainers().size());
Assert.assertTrue(schedulerApp1.getReservedContainers().size() > 0);
Assert.assertEquals(9 * GB, cs.getRootQueue().getQueueResourceUsage()
.getUsed("x").getMemorySize());
Assert.assertEquals(4 * GB, cs.getRootQueue().getQueueResourceUsage()
.getReserved("x").getMemorySize());
Assert.assertEquals(4 * GB,
leafQueue.getQueueResourceUsage().getReserved("x").getMemorySize());
// Cancel asks of app2 and re-kick RM
am1.allocate("*", 4 * GB, 0, new ArrayList<ContainerId>());
cs.handle(new NodeUpdateSchedulerEvent(rmNode1));
Assert.assertEquals(5 * GB, cs.getRootQueue().getQueueResourceUsage()
.getUsed("x").getMemorySize());
Assert.assertEquals(0, cs.getRootQueue().getQueueResourceUsage()
.getReserved("x").getMemorySize());
Assert.assertEquals(0, leafQueue.getQueueResourceUsage().getReserved("x")
.getMemorySize());
rm1.close();
}
private void checkPendingResource(MockRM rm, int priority,
ApplicationAttemptId attemptId, int memory) {
CapacityScheduler cs = (CapacityScheduler) rm.getRMContext().getScheduler();

View File

@ -44,7 +44,9 @@ public class TestQueueCapacities {
{ "AbsoluteUsedCapacity" },
{ "MaximumCapacity" },
{ "AbsoluteMaximumCapacity" },
{ "MaxAMResourcePercentage" } });
{ "MaxAMResourcePercentage" },
{ "ReservedCapacity" },
{ "AbsoluteReservedCapacity" }});
}
public TestQueueCapacities(String suffix) {