YARN-6081. LeafQueue#getTotalPendingResourcesConsideringUserLimit should deduct reserved from pending to avoid unnecessary preemption of reserved container. Contributed by Wangda Tan.

This commit is contained in:
Sunil G 2017-01-13 18:22:29 +05:30
parent 1f344e0579
commit d3170f9eba
7 changed files with 202 additions and 69 deletions

View File

@ -51,6 +51,8 @@ public class TempQueuePerPartition extends AbstractPreemptionEntity {
LeafQueue leafQueue; LeafQueue leafQueue;
boolean preemptionDisabled; boolean preemptionDisabled;
protected Resource pendingDeductReserved;
TempQueuePerPartition(String queueName, Resource current, TempQueuePerPartition(String queueName, Resource current,
boolean preemptionDisabled, String partition, Resource killable, boolean preemptionDisabled, String partition, Resource killable,
float absCapacity, float absMaxCapacity, Resource totalPartitionResource, float absCapacity, float absMaxCapacity, Resource totalPartitionResource,
@ -61,10 +63,13 @@ public class TempQueuePerPartition extends AbstractPreemptionEntity {
if (queue instanceof LeafQueue) { if (queue instanceof LeafQueue) {
LeafQueue l = (LeafQueue) queue; LeafQueue l = (LeafQueue) queue;
pending = l.getTotalPendingResourcesConsideringUserLimit( pending = l.getTotalPendingResourcesConsideringUserLimit(
totalPartitionResource, partition); totalPartitionResource, partition, false);
pendingDeductReserved = l.getTotalPendingResourcesConsideringUserLimit(
totalPartitionResource, partition, true);
leafQueue = l; leafQueue = l;
} else { } else {
pending = Resources.createResource(0); pending = Resources.createResource(0);
pendingDeductReserved = Resources.createResource(0);
} }
this.normalizedGuarantee = Float.NaN; this.normalizedGuarantee = Float.NaN;
@ -95,16 +100,13 @@ public class TempQueuePerPartition extends AbstractPreemptionEntity {
assert leafQueue == null; assert leafQueue == null;
children.add(q); children.add(q);
Resources.addTo(pending, q.pending); Resources.addTo(pending, q.pending);
Resources.addTo(pendingDeductReserved, q.pendingDeductReserved);
} }
public ArrayList<TempQueuePerPartition> getChildren() { public ArrayList<TempQueuePerPartition> getChildren() {
return children; return children;
} }
public Resource getUsedDeductReservd() {
return Resources.subtract(current, reserved);
}
// This function "accepts" all the resources it can (pending) and return // This function "accepts" all the resources it can (pending) and return
// the unused ones // the unused ones
Resource offer(Resource avail, ResourceCalculator rc, Resource offer(Resource avail, ResourceCalculator rc,
@ -121,7 +123,8 @@ public class TempQueuePerPartition extends AbstractPreemptionEntity {
* When we're using FifoPreemptionSelector (considerReservedResource * When we're using FifoPreemptionSelector (considerReservedResource
* = false). * = false).
* *
* We should deduct reserved resource to avoid excessive preemption: * We should deduct reserved resource from pending to avoid excessive
* preemption:
* *
* For example, if an under-utilized queue has used = reserved = 20. * For example, if an under-utilized queue has used = reserved = 20.
* Preemption policy will try to preempt 20 containers (which is not * Preemption policy will try to preempt 20 containers (which is not
@ -131,10 +134,8 @@ public class TempQueuePerPartition extends AbstractPreemptionEntity {
* resource can be used by pending request, so policy will preempt * resource can be used by pending request, so policy will preempt
* resources repeatly. * resources repeatly.
*/ */
.subtract( .subtract(Resources.add(getUsed(),
Resources.add((considersReservedResource (considersReservedResource ? pending : pendingDeductReserved)),
? getUsed()
: getUsedDeductReservd()), pending),
idealAssigned))); idealAssigned)));
Resource remain = Resources.subtract(avail, accepted); Resource remain = Resources.subtract(avail, accepted);
Resources.addTo(idealAssigned, accepted); Resources.addTo(idealAssigned, accepted);

View File

@ -1209,6 +1209,14 @@ public class SchedulerApplicationAttempt implements SchedulableEntity {
return false; return false;
} }
/*
* Note that the behavior of appAttemptResourceUsage is different from queue's
* For queue, used = actual-used + reserved
* For app, used = actual-used.
*
* TODO (wangda): Need to make behaviors of queue/app's resource usage
* consistent
*/
@VisibleForTesting @VisibleForTesting
public ResourceUsage getAppAttemptResourceUsage() { public ResourceUsage getAppAttemptResourceUsage() {
return this.attemptResourceUsage; return this.attemptResourceUsage;

View File

@ -2155,38 +2155,64 @@ public class LeafQueue extends AbstractCSQueue {
return Collections.unmodifiableCollection(apps); return Collections.unmodifiableCollection(apps);
} }
// Consider the headroom for each user in the queue. /**
// Total pending for the queue = * Get total pending resource considering user limit for the leaf queue. This
// sum(for each user(min((user's headroom), sum(user's pending requests)))) * will be used for calculating pending resources in the preemption monitor.
// NOTE: Used for calculating pedning resources in the preemption monitor. *
* Consider the headroom for each user in the queue.
* Total pending for the queue =
* sum(for each user(min((user's headroom), sum(user's pending requests))))
* NOTE:
* @param clusterResources clusterResource
* @param partition node partition
* @param deductReservedFromPending When a container is reserved in CS,
* pending resource will not be deducted.
* This could lead to double accounting when
* doing preemption:
* In normal cases, we should deduct reserved
* resource from pending to avoid
* excessive preemption.
* @return Total pending resource considering user limit
*/
public Resource getTotalPendingResourcesConsideringUserLimit( public Resource getTotalPendingResourcesConsideringUserLimit(
Resource resources, String partition) { Resource clusterResources, String partition, boolean deductReservedFromPending) {
try { try {
readLock.lock(); readLock.lock();
Map<String, Resource> userNameToHeadroom = Map<String, Resource> userNameToHeadroom =
new HashMap<>(); new HashMap<>();
Resource pendingConsideringUserLimit = Resource.newInstance(0, 0); Resource totalPendingConsideringUserLimit = Resource.newInstance(0, 0);
for (FiCaSchedulerApp app : getApplications()) { for (FiCaSchedulerApp app : getApplications()) {
String userName = app.getUser(); String userName = app.getUser();
if (!userNameToHeadroom.containsKey(userName)) { if (!userNameToHeadroom.containsKey(userName)) {
User user = getUser(userName); User user = getUser(userName);
Resource headroom = Resources.subtract( Resource headroom = Resources.subtract(
computeUserLimit(app.getUser(), resources, user, partition, computeUserLimit(app.getUser(), clusterResources, user, partition,
SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY), SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY),
user.getUsed(partition)); user.getUsed(partition));
// Make sure headroom is not negative. // Make sure headroom is not negative.
headroom = Resources.componentwiseMax(headroom, Resources.none()); headroom = Resources.componentwiseMax(headroom, Resources.none());
userNameToHeadroom.put(userName, headroom); userNameToHeadroom.put(userName, headroom);
} }
// Check if we need to deduct reserved from pending
Resource pending = app.getAppAttemptResourceUsage().getPending(
partition);
if (deductReservedFromPending) {
pending = Resources.subtract(pending,
app.getAppAttemptResourceUsage().getReserved(partition));
}
pending = Resources.componentwiseMax(pending, Resources.none());
Resource minpendingConsideringUserLimit = Resources.componentwiseMin( Resource minpendingConsideringUserLimit = Resources.componentwiseMin(
userNameToHeadroom.get(userName), userNameToHeadroom.get(userName), pending);
app.getAppAttemptResourceUsage().getPending(partition)); Resources.addTo(totalPendingConsideringUserLimit,
Resources.addTo(pendingConsideringUserLimit,
minpendingConsideringUserLimit); minpendingConsideringUserLimit);
Resources.subtractFrom(userNameToHeadroom.get(userName), Resources.subtractFrom(userNameToHeadroom.get(userName),
minpendingConsideringUserLimit); minpendingConsideringUserLimit);
} }
return pendingConsideringUserLimit; return totalPendingConsideringUserLimit;
} finally { } finally {
readLock.unlock(); readLock.unlock();
} }

View File

@ -599,19 +599,21 @@ public class ProportionalCapacityPreemptionPolicyMockFramework {
qc.setUsedCapacity(partitionName, used); qc.setUsedCapacity(partitionName, used);
when(queue.getUsedCapacity()).thenReturn(used); when(queue.getUsedCapacity()).thenReturn(used);
ru.setPending(partitionName, pending); ru.setPending(partitionName, pending);
if (!isParent(queueExprArray, idx)) {
LeafQueue lq = (LeafQueue) queue;
when(lq.getTotalPendingResourcesConsideringUserLimit(isA(Resource.class),
isA(String.class))).thenReturn(pending);
}
ru.setUsed(partitionName, parseResourceFromString(values[2].trim()));
// Setup reserved resource if it contained by input config // Setup reserved resource if it contained by input config
Resource reserved = Resources.none(); Resource reserved = Resources.none();
if(values.length == 5) { if(values.length == 5) {
reserved = parseResourceFromString(values[4].trim()); reserved = parseResourceFromString(values[4].trim());
ru.setReserved(partitionName, reserved); ru.setReserved(partitionName, reserved);
} }
if (!isParent(queueExprArray, idx)) {
LeafQueue lq = (LeafQueue) queue;
when(lq.getTotalPendingResourcesConsideringUserLimit(isA(Resource.class),
isA(String.class), eq(false))).thenReturn(pending);
when(lq.getTotalPendingResourcesConsideringUserLimit(isA(Resource.class),
isA(String.class), eq(true))).thenReturn(
Resources.subtract(pending, reserved));
}
ru.setUsed(partitionName, parseResourceFromString(values[2].trim()));
LOG.debug("Setup queue=" + queueName + " partition=" + partitionName LOG.debug("Setup queue=" + queueName + " partition=" + partitionName
+ " [abs_guaranteed=" + absGuaranteed + ",abs_max=" + absMax + " [abs_guaranteed=" + absGuaranteed + ",abs_max=" + absMax

View File

@ -85,6 +85,7 @@ import static org.junit.Assert.fail;
import static org.mockito.Matchers.any; import static org.mockito.Matchers.any;
import static org.mockito.Matchers.anyString; import static org.mockito.Matchers.anyString;
import static org.mockito.Matchers.argThat; import static org.mockito.Matchers.argThat;
import static org.mockito.Matchers.eq;
import static org.mockito.Matchers.isA; import static org.mockito.Matchers.isA;
import static org.mockito.Mockito.mock; import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.never; import static org.mockito.Mockito.never;
@ -989,7 +990,7 @@ public class TestProportionalCapacityPreemptionPolicy {
ProportionalCapacityPreemptionPolicy policy = buildPolicy(qData); ProportionalCapacityPreemptionPolicy policy = buildPolicy(qData);
policy.editSchedule(); policy.editSchedule();
verify(mDisp, times(10)).handle(argThat(new IsPreemptionRequestFor(appC))); verify(mDisp, times(9)).handle(argThat(new IsPreemptionRequestFor(appC)));
assertEquals(10, policy.getQueuePartitions().get("queueE") assertEquals(10, policy.getQueuePartitions().get("queueE")
.get("").preemptableExtra.getMemorySize()); .get("").preemptableExtra.getMemorySize());
//2nd level child(E) preempts 10, but parent A has only 9 extra //2nd level child(E) preempts 10, but parent A has only 9 extra
@ -1004,6 +1005,31 @@ public class TestProportionalCapacityPreemptionPolicy {
tempQueueAPartition.preemptableExtra.getMemorySize()); tempQueueAPartition.preemptableExtra.getMemorySize());
} }
@Test
public void testPreemptionNotHappenForSingleReservedQueue() {
/*
* Test case to make sure, when reserved > pending, preemption will not
* happen if there's only one demanding queue.
*/
int[][] qData = new int[][]{
// / A B C
{ 100, 40, 40, 20 }, // abs
{ 100, 100, 100, 100 }, // maxCap
{ 100, 70, 0, 0 }, // used
{ 10, 30, 0, 0 }, // pending
{ 0, 50, 0, 0 }, // reserved
{ 1, 1, 0, 0 }, // apps
{ -1, 1, 1, 1 }, // req granularity
{ 3, 0, 0, 0 }, // subqueues
};
ProportionalCapacityPreemptionPolicy policy = buildPolicy(qData);
policy.editSchedule();
// No preemption happens
verify(mDisp, never()).handle(argThat(new IsPreemptionRequestFor(appA)));
}
static class IsPreemptionRequestFor static class IsPreemptionRequestFor
extends ArgumentMatcher<ContainerPreemptEvent> { extends ArgumentMatcher<ContainerPreemptEvent> {
private final ApplicationAttemptId appAttId; private final ApplicationAttemptId appAttId;
@ -1224,7 +1250,14 @@ public class TestProportionalCapacityPreemptionPolicy {
List<ApplicationAttemptId> appAttemptIdList = List<ApplicationAttemptId> appAttemptIdList =
new ArrayList<ApplicationAttemptId>(); new ArrayList<ApplicationAttemptId>();
when(lq.getTotalPendingResourcesConsideringUserLimit(isA(Resource.class), when(lq.getTotalPendingResourcesConsideringUserLimit(isA(Resource.class),
isA(String.class))).thenReturn(pending[i]); isA(String.class), eq(false))).thenReturn(pending[i]);
when(lq.getTotalPendingResourcesConsideringUserLimit(isA(Resource.class),
isA(String.class), eq(true))).thenReturn(Resources.componentwiseMax(
Resources.subtract(pending[i],
reserved[i] == null ? Resources.none() : reserved[i]),
Resources.none()));
// need to set pending resource in resource usage as well // need to set pending resource in resource usage as well
ResourceUsage ru = new ResourceUsage(); ResourceUsage ru = new ResourceUsage();
ru.setPending(pending[i]); ru.setPending(pending[i]);
@ -1360,27 +1393,4 @@ public class TestProportionalCapacityPreemptionPolicy {
return ret; return ret;
} }
void printString(CSQueue nq, String indent) {
if (nq instanceof ParentQueue) {
System.out.println(indent + nq.getQueueName()
+ " cur:" + nq.getAbsoluteUsedCapacity()
+ " guar:" + nq.getAbsoluteCapacity()
);
for (CSQueue q : ((ParentQueue)nq).getChildQueues()) {
printString(q, indent + " ");
}
} else {
System.out.println(indent + nq.getQueueName()
+ " pen:"
+ ((LeafQueue) nq).getTotalPendingResourcesConsideringUserLimit(
isA(Resource.class), isA(String.class))
+ " cur:" + nq.getAbsoluteUsedCapacity()
+ " guar:" + nq.getAbsoluteCapacity()
);
for (FiCaSchedulerApp a : ((LeafQueue)nq).getApplications()) {
System.out.println(indent + " " + a.getApplicationId());
}
}
}
} }

View File

@ -56,6 +56,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeRemoved
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeUpdateSchedulerEvent; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeUpdateSchedulerEvent;
import org.apache.hadoop.yarn.server.resourcemanager.security.RMContainerTokenSecretManager; import org.apache.hadoop.yarn.server.resourcemanager.security.RMContainerTokenSecretManager;
import org.apache.hadoop.yarn.server.utils.BuilderUtils; import org.apache.hadoop.yarn.server.utils.BuilderUtils;
import org.apache.hadoop.yarn.util.resource.Resources;
import org.junit.Assert; import org.junit.Assert;
import org.junit.Before; import org.junit.Before;
import org.junit.Test; import org.junit.Test;
@ -691,4 +692,88 @@ public class TestContainerAllocation {
rm1.close(); rm1.close();
} }
@Test
public void testPendingResourcesConsideringUserLimit() throws Exception {
// Set maximum capacity of A to 10
CapacitySchedulerConfiguration newConf = new CapacitySchedulerConfiguration(
conf);
newConf.setUserLimitFactor(CapacitySchedulerConfiguration.ROOT + ".default",
0.5f);
newConf.setMaximumAMResourcePercentPerPartition(
CapacitySchedulerConfiguration.ROOT + ".default", "", 1.0f);
MockRM rm1 = new MockRM(newConf);
rm1.getRMContext().setNodeLabelManager(mgr);
rm1.start();
MockNM nm1 = rm1.registerNode("h1:1234", 8 * GB);
MockNM nm2 = rm1.registerNode("h2:1234", 8 * GB);
// launch an app to queue default, AM container should be launched in nm1
RMApp app1 = rm1.submitApp(2 * GB, "app", "u1", null, "default");
MockAM am1 = MockRM.launchAndRegisterAM(app1, rm1, nm1);
// launch 2nd app to queue default, AM container should be launched in nm1
RMApp app2 = rm1.submitApp(4 * GB, "app", "u2", null, "default");
MockAM am2 = MockRM.launchAndRegisterAM(app2, rm1, nm1);
// am1 asks 1 * 3G container
am1.allocate("*", 3 * GB, 1, null);
// am2 asks 4 * 5G container
am2.allocate("*", 5 * GB, 4, null);
CapacityScheduler cs = (CapacityScheduler) rm1.getResourceScheduler();
RMNode rmNode1 = rm1.getRMContext().getRMNodes().get(nm1.getNodeId());
RMNode rmNode2 = rm1.getRMContext().getRMNodes().get(nm2.getNodeId());
// Do node heartbeats one, we expect one container allocated reserved on nm1
cs.handle(new NodeUpdateSchedulerEvent(rmNode1));
FiCaSchedulerApp schedulerApp1 =
cs.getApplicationAttempt(am1.getApplicationAttemptId());
// App1 will get 1 container reserved
Assert.assertEquals(1, schedulerApp1.getReservedContainers().size());
/*
* Note that the behavior of appAttemptResourceUsage is different from queue's
* For queue, used = actual-used + reserved
* For app, used = actual-used.
*
* TODO (wangda): Need to make behaviors of queue/app's resource usage
* consistent
*/
Assert.assertEquals(2 * GB,
schedulerApp1.getAppAttemptResourceUsage().getUsed().getMemorySize());
Assert.assertEquals(3 * GB,
schedulerApp1.getAppAttemptResourceUsage().getReserved()
.getMemorySize());
Assert.assertEquals(3 * GB,
schedulerApp1.getAppAttemptResourceUsage().getPending()
.getMemorySize());
FiCaSchedulerApp schedulerApp2 =
cs.getApplicationAttempt(am2.getApplicationAttemptId());
Assert.assertEquals(4 * GB,
schedulerApp2.getAppAttemptResourceUsage().getUsed().getMemorySize());
Assert.assertEquals(0 * GB,
schedulerApp2.getAppAttemptResourceUsage().getReserved()
.getMemorySize());
Assert.assertEquals(5 * 4 * GB,
schedulerApp2.getAppAttemptResourceUsage().getPending()
.getMemorySize());
LeafQueue lq = (LeafQueue) cs.getQueue("default");
// UL = 8GB, so head room of u1 = 8GB - 2GB (AM) - 3GB (Reserved) = 3GB
// u2 = 8GB - 4GB = 4GB
// When not deduct reserved, total-pending = 3G (u1) + 4G (u2) = 7G
// deduct reserved, total-pending = 0G (u1) + 4G = 4G
Assert.assertEquals(7 * GB, lq.getTotalPendingResourcesConsideringUserLimit(
Resources.createResource(20 * GB), "", false).getMemorySize());
Assert.assertEquals(4 * GB, lq.getTotalPendingResourcesConsideringUserLimit(
Resources.createResource(20 * GB), "", true).getMemorySize());
rm1.close();
}
} }

View File

@ -3283,8 +3283,9 @@ public class TestLeafQueue {
// even though user_0's apps are still asking for a total of 4GB. // even though user_0's apps are still asking for a total of 4GB.
assertEquals(1*GB, app_0.getCurrentConsumption().getMemorySize()); assertEquals(1*GB, app_0.getCurrentConsumption().getMemorySize());
assertEquals(0*GB, app_1.getCurrentConsumption().getMemorySize()); assertEquals(0*GB, app_1.getCurrentConsumption().getMemorySize());
assertEquals(0*GB, e.getTotalPendingResourcesConsideringUserLimit( assertEquals(0 * GB,
clusterResource, RMNodeLabelsManager.NO_LABEL).getMemorySize()); e.getTotalPendingResourcesConsideringUserLimit(clusterResource,
RMNodeLabelsManager.NO_LABEL, false).getMemorySize());
// Assign 2nd container of 1GB // Assign 2nd container of 1GB
applyCSAssignment(clusterResource, applyCSAssignment(clusterResource,
@ -3298,7 +3299,7 @@ public class TestLeafQueue {
assertEquals(2*GB, app_0.getCurrentConsumption().getMemorySize()); assertEquals(2*GB, app_0.getCurrentConsumption().getMemorySize());
assertEquals(0*GB, app_1.getCurrentConsumption().getMemorySize()); assertEquals(0*GB, app_1.getCurrentConsumption().getMemorySize());
assertEquals(0*GB, e.getTotalPendingResourcesConsideringUserLimit( assertEquals(0*GB, e.getTotalPendingResourcesConsideringUserLimit(
clusterResource, RMNodeLabelsManager.NO_LABEL).getMemorySize()); clusterResource, RMNodeLabelsManager.NO_LABEL, false).getMemorySize());
// Can't allocate 3rd container due to user-limit. Headroom still 0. // Can't allocate 3rd container due to user-limit. Headroom still 0.
applyCSAssignment(clusterResource, applyCSAssignment(clusterResource,
@ -3308,7 +3309,7 @@ public class TestLeafQueue {
assertEquals(2*GB, app_0.getCurrentConsumption().getMemorySize()); assertEquals(2*GB, app_0.getCurrentConsumption().getMemorySize());
assertEquals(0*GB, app_1.getCurrentConsumption().getMemorySize()); assertEquals(0*GB, app_1.getCurrentConsumption().getMemorySize());
assertEquals(0*GB, e.getTotalPendingResourcesConsideringUserLimit( assertEquals(0*GB, e.getTotalPendingResourcesConsideringUserLimit(
clusterResource, RMNodeLabelsManager.NO_LABEL).getMemorySize()); clusterResource, RMNodeLabelsManager.NO_LABEL, false).getMemorySize());
// Increase user-limit-factor from 1GB to 10GB (1% * 10 * 100GB = 10GB). // Increase user-limit-factor from 1GB to 10GB (1% * 10 * 100GB = 10GB).
// Pending for both app_0 and app_1 are still 3GB, so user-limit-factor // Pending for both app_0 and app_1 are still 3GB, so user-limit-factor
@ -3316,7 +3317,7 @@ public class TestLeafQueue {
// getTotalPendingResourcesConsideringUserLimit() // getTotalPendingResourcesConsideringUserLimit()
e.setUserLimitFactor(10.0f); e.setUserLimitFactor(10.0f);
assertEquals(3*GB, e.getTotalPendingResourcesConsideringUserLimit( assertEquals(3*GB, e.getTotalPendingResourcesConsideringUserLimit(
clusterResource, RMNodeLabelsManager.NO_LABEL).getMemorySize()); clusterResource, RMNodeLabelsManager.NO_LABEL, false).getMemorySize());
applyCSAssignment(clusterResource, applyCSAssignment(clusterResource,
e.assignContainers(clusterResource, node_0, e.assignContainers(clusterResource, node_0,
@ -3326,7 +3327,7 @@ public class TestLeafQueue {
assertEquals(3*GB, app_0.getCurrentConsumption().getMemorySize()); assertEquals(3*GB, app_0.getCurrentConsumption().getMemorySize());
assertEquals(0*GB, app_1.getCurrentConsumption().getMemorySize()); assertEquals(0*GB, app_1.getCurrentConsumption().getMemorySize());
assertEquals(2*GB, e.getTotalPendingResourcesConsideringUserLimit( assertEquals(2*GB, e.getTotalPendingResourcesConsideringUserLimit(
clusterResource, RMNodeLabelsManager.NO_LABEL).getMemorySize()); clusterResource, RMNodeLabelsManager.NO_LABEL, false).getMemorySize());
// Get the last 2 containers for app_1, no more pending requests. // Get the last 2 containers for app_1, no more pending requests.
applyCSAssignment(clusterResource, applyCSAssignment(clusterResource,
@ -3340,7 +3341,7 @@ public class TestLeafQueue {
assertEquals(3*GB, app_0.getCurrentConsumption().getMemorySize()); assertEquals(3*GB, app_0.getCurrentConsumption().getMemorySize());
assertEquals(2*GB, app_1.getCurrentConsumption().getMemorySize()); assertEquals(2*GB, app_1.getCurrentConsumption().getMemorySize());
assertEquals(0*GB, e.getTotalPendingResourcesConsideringUserLimit( assertEquals(0*GB, e.getTotalPendingResourcesConsideringUserLimit(
clusterResource, RMNodeLabelsManager.NO_LABEL).getMemorySize()); clusterResource, RMNodeLabelsManager.NO_LABEL, false).getMemorySize());
// Release each container from app_0 // Release each container from app_0
for (RMContainer rmContainer : app_0.getLiveContainers()) { for (RMContainer rmContainer : app_0.getLiveContainers()) {
@ -3449,7 +3450,7 @@ public class TestLeafQueue {
// With queue capacity set at 1% of 100GB and user-limit-factor set to 1.0, // With queue capacity set at 1% of 100GB and user-limit-factor set to 1.0,
// queue 'e' should be able to consume 1GB per user. // queue 'e' should be able to consume 1GB per user.
assertEquals(2*GB, e.getTotalPendingResourcesConsideringUserLimit( assertEquals(2*GB, e.getTotalPendingResourcesConsideringUserLimit(
clusterResource, RMNodeLabelsManager.NO_LABEL).getMemorySize()); clusterResource, RMNodeLabelsManager.NO_LABEL, false).getMemorySize());
// None of the apps have assigned resources // None of the apps have assigned resources
// user_0's apps: // user_0's apps:
assertEquals(0*GB, app_0.getCurrentConsumption().getMemorySize()); assertEquals(0*GB, app_0.getCurrentConsumption().getMemorySize());
@ -3466,7 +3467,7 @@ public class TestLeafQueue {
// The first container was assigned to user_0's app_0. Queues total headroom // The first container was assigned to user_0's app_0. Queues total headroom
// has 1GB left for user_1. // has 1GB left for user_1.
assertEquals(1*GB, e.getTotalPendingResourcesConsideringUserLimit( assertEquals(1*GB, e.getTotalPendingResourcesConsideringUserLimit(
clusterResource, RMNodeLabelsManager.NO_LABEL).getMemorySize()); clusterResource, RMNodeLabelsManager.NO_LABEL, false).getMemorySize());
// user_0's apps: // user_0's apps:
assertEquals(1*GB, app_0.getCurrentConsumption().getMemorySize()); assertEquals(1*GB, app_0.getCurrentConsumption().getMemorySize());
assertEquals(0*GB, app_1.getCurrentConsumption().getMemorySize()); assertEquals(0*GB, app_1.getCurrentConsumption().getMemorySize());
@ -3484,7 +3485,7 @@ public class TestLeafQueue {
// this container went to user_0's app_1. so, headroom for queue 'e'e is // this container went to user_0's app_1. so, headroom for queue 'e'e is
// still 1GB for user_1 // still 1GB for user_1
assertEquals(1*GB, e.getTotalPendingResourcesConsideringUserLimit( assertEquals(1*GB, e.getTotalPendingResourcesConsideringUserLimit(
clusterResource, RMNodeLabelsManager.NO_LABEL).getMemorySize()); clusterResource, RMNodeLabelsManager.NO_LABEL, false).getMemorySize());
// user_0's apps: // user_0's apps:
assertEquals(1*GB, app_0.getCurrentConsumption().getMemorySize()); assertEquals(1*GB, app_0.getCurrentConsumption().getMemorySize());
assertEquals(1*GB, app_1.getCurrentConsumption().getMemorySize()); assertEquals(1*GB, app_1.getCurrentConsumption().getMemorySize());
@ -3500,7 +3501,7 @@ public class TestLeafQueue {
// Container was allocated to user_1's app_2 since user_1, Now, no headroom // Container was allocated to user_1's app_2 since user_1, Now, no headroom
// is left. // is left.
assertEquals(0*GB, e.getTotalPendingResourcesConsideringUserLimit( assertEquals(0*GB, e.getTotalPendingResourcesConsideringUserLimit(
clusterResource, RMNodeLabelsManager.NO_LABEL).getMemorySize()); clusterResource, RMNodeLabelsManager.NO_LABEL, false).getMemorySize());
// user_0's apps: // user_0's apps:
assertEquals(1*GB, app_0.getCurrentConsumption().getMemorySize()); assertEquals(1*GB, app_0.getCurrentConsumption().getMemorySize());
assertEquals(1*GB, app_1.getCurrentConsumption().getMemorySize()); assertEquals(1*GB, app_1.getCurrentConsumption().getMemorySize());
@ -3516,7 +3517,7 @@ public class TestLeafQueue {
// Allocated to user_1's app_2 since scheduler allocates 1 container // Allocated to user_1's app_2 since scheduler allocates 1 container
// above user resource limit. Available headroom still 0. // above user resource limit. Available headroom still 0.
assertEquals(0*GB, e.getTotalPendingResourcesConsideringUserLimit( assertEquals(0*GB, e.getTotalPendingResourcesConsideringUserLimit(
clusterResource, RMNodeLabelsManager.NO_LABEL).getMemorySize()); clusterResource, RMNodeLabelsManager.NO_LABEL, false).getMemorySize());
// user_0's apps: // user_0's apps:
long app_0_consumption = app_0.getCurrentConsumption().getMemorySize(); long app_0_consumption = app_0.getCurrentConsumption().getMemorySize();
assertEquals(1*GB, app_0_consumption); assertEquals(1*GB, app_0_consumption);
@ -3536,7 +3537,7 @@ public class TestLeafQueue {
// Cannot allocate 5th container because both users are above their allowed // Cannot allocate 5th container because both users are above their allowed
// user resource limit. Values should be the same as previously. // user resource limit. Values should be the same as previously.
assertEquals(0*GB, e.getTotalPendingResourcesConsideringUserLimit( assertEquals(0*GB, e.getTotalPendingResourcesConsideringUserLimit(
clusterResource, RMNodeLabelsManager.NO_LABEL).getMemorySize()); clusterResource, RMNodeLabelsManager.NO_LABEL, false).getMemorySize());
// user_0's apps: // user_0's apps:
assertEquals(app_0_consumption, app_0.getCurrentConsumption().getMemorySize()); assertEquals(app_0_consumption, app_0.getCurrentConsumption().getMemorySize());
assertEquals(app_1_consumption, app_1.getCurrentConsumption().getMemorySize()); assertEquals(app_1_consumption, app_1.getCurrentConsumption().getMemorySize());
@ -3555,7 +3556,7 @@ public class TestLeafQueue {
SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY), e, nodes, apps); SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY), e, nodes, apps);
// Next container goes to user_0's app_1, since it still wanted 1GB. // Next container goes to user_0's app_1, since it still wanted 1GB.
assertEquals(1*GB, e.getTotalPendingResourcesConsideringUserLimit( assertEquals(1*GB, e.getTotalPendingResourcesConsideringUserLimit(
clusterResource, RMNodeLabelsManager.NO_LABEL).getMemorySize()); clusterResource, RMNodeLabelsManager.NO_LABEL, false).getMemorySize());
// user_0's apps: // user_0's apps:
assertEquals(1*GB, app_0.getCurrentConsumption().getMemorySize()); assertEquals(1*GB, app_0.getCurrentConsumption().getMemorySize());
assertEquals(2*GB, app_1.getCurrentConsumption().getMemorySize()); assertEquals(2*GB, app_1.getCurrentConsumption().getMemorySize());
@ -3570,7 +3571,7 @@ public class TestLeafQueue {
// Last container goes to user_1's app_3, since it still wanted 1GB. // Last container goes to user_1's app_3, since it still wanted 1GB.
// user_0's apps: // user_0's apps:
assertEquals(0*GB, e.getTotalPendingResourcesConsideringUserLimit( assertEquals(0*GB, e.getTotalPendingResourcesConsideringUserLimit(
clusterResource, RMNodeLabelsManager.NO_LABEL).getMemorySize()); clusterResource, RMNodeLabelsManager.NO_LABEL, false).getMemorySize());
assertEquals(1*GB, app_0.getCurrentConsumption().getMemorySize()); assertEquals(1*GB, app_0.getCurrentConsumption().getMemorySize());
assertEquals(2*GB, app_1.getCurrentConsumption().getMemorySize()); assertEquals(2*GB, app_1.getCurrentConsumption().getMemorySize());
// user_1's apps: // user_1's apps: