YARN-6081. LeafQueue#getTotalPendingResourcesConsideringUserLimit should deduct reserved from pending to avoid unnecessary preemption of reserved container. Contributed by Wangda Tan.

(cherry picked from commit d3170f9eba)
This commit is contained in:
Sunil G 2017-01-13 18:22:29 +05:30
parent 963ef1e31f
commit f166bb8f09
7 changed files with 202 additions and 69 deletions

View File

@ -51,6 +51,8 @@ public class TempQueuePerPartition extends AbstractPreemptionEntity {
LeafQueue leafQueue;
boolean preemptionDisabled;
protected Resource pendingDeductReserved;
TempQueuePerPartition(String queueName, Resource current,
boolean preemptionDisabled, String partition, Resource killable,
float absCapacity, float absMaxCapacity, Resource totalPartitionResource,
@ -61,10 +63,13 @@ public class TempQueuePerPartition extends AbstractPreemptionEntity {
if (queue instanceof LeafQueue) {
LeafQueue l = (LeafQueue) queue;
pending = l.getTotalPendingResourcesConsideringUserLimit(
totalPartitionResource, partition);
totalPartitionResource, partition, false);
pendingDeductReserved = l.getTotalPendingResourcesConsideringUserLimit(
totalPartitionResource, partition, true);
leafQueue = l;
} else {
pending = Resources.createResource(0);
pendingDeductReserved = Resources.createResource(0);
}
this.normalizedGuarantee = Float.NaN;
@ -95,16 +100,13 @@ public class TempQueuePerPartition extends AbstractPreemptionEntity {
assert leafQueue == null;
children.add(q);
Resources.addTo(pending, q.pending);
Resources.addTo(pendingDeductReserved, q.pendingDeductReserved);
}
public ArrayList<TempQueuePerPartition> getChildren() {
return children;
}
public Resource getUsedDeductReservd() {
return Resources.subtract(current, reserved);
}
// This function "accepts" all the resources it can (pending) and return
// the unused ones
Resource offer(Resource avail, ResourceCalculator rc,
@ -121,7 +123,8 @@ public class TempQueuePerPartition extends AbstractPreemptionEntity {
* When we're using FifoPreemptionSelector (considerReservedResource
* = false).
*
* We should deduct reserved resource to avoid excessive preemption:
* We should deduct reserved resource from pending to avoid excessive
* preemption:
*
* For example, if an under-utilized queue has used = reserved = 20.
* Preemption policy will try to preempt 20 containers (which is not
@ -131,10 +134,8 @@ public class TempQueuePerPartition extends AbstractPreemptionEntity {
* resource can be used by pending request, so policy will preempt
* resources repeatly.
*/
.subtract(
Resources.add((considersReservedResource
? getUsed()
: getUsedDeductReservd()), pending),
.subtract(Resources.add(getUsed(),
(considersReservedResource ? pending : pendingDeductReserved)),
idealAssigned)));
Resource remain = Resources.subtract(avail, accepted);
Resources.addTo(idealAssigned, accepted);

View File

@ -1072,7 +1072,15 @@ public class SchedulerApplicationAttempt implements SchedulableEntity {
return false;
}
/*
* Note that the behavior of appAttemptResourceUsage is different from queue's
* For queue, used = actual-used + reserved
* For app, used = actual-used.
*
* TODO (wangda): Need to make behaviors of queue/app's resource usage
* consistent
*/
@VisibleForTesting
public ResourceUsage getAppAttemptResourceUsage() {
return this.attemptResourceUsage;

View File

@ -2164,38 +2164,64 @@ public class LeafQueue extends AbstractCSQueue {
return Collections.unmodifiableCollection(apps);
}
// Consider the headroom for each user in the queue.
// Total pending for the queue =
// sum(for each user(min((user's headroom), sum(user's pending requests))))
// NOTE: Used for calculating pedning resources in the preemption monitor.
/**
* Get total pending resource considering user limit for the leaf queue. This
* will be used for calculating pending resources in the preemption monitor.
*
* Consider the headroom for each user in the queue.
* Total pending for the queue =
* sum(for each user(min((user's headroom), sum(user's pending requests))))
* NOTE:
* @param clusterResources clusterResource
* @param partition node partition
* @param deductReservedFromPending When a container is reserved in CS,
* pending resource will not be deducted.
* This could lead to double accounting when
* doing preemption:
* In normal cases, we should deduct reserved
* resource from pending to avoid
* excessive preemption.
* @return Total pending resource considering user limit
*/
public Resource getTotalPendingResourcesConsideringUserLimit(
Resource resources, String partition) {
Resource clusterResources, String partition, boolean deductReservedFromPending) {
try {
readLock.lock();
Map<String, Resource> userNameToHeadroom =
new HashMap<>();
Resource pendingConsideringUserLimit = Resource.newInstance(0, 0);
Resource totalPendingConsideringUserLimit = Resource.newInstance(0, 0);
for (FiCaSchedulerApp app : getApplications()) {
String userName = app.getUser();
if (!userNameToHeadroom.containsKey(userName)) {
User user = getUser(userName);
Resource headroom = Resources.subtract(
computeUserLimit(app.getUser(), resources, user, partition,
computeUserLimit(app.getUser(), clusterResources, user, partition,
SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY),
user.getUsed(partition));
// Make sure headroom is not negative.
headroom = Resources.componentwiseMax(headroom, Resources.none());
userNameToHeadroom.put(userName, headroom);
}
// Check if we need to deduct reserved from pending
Resource pending = app.getAppAttemptResourceUsage().getPending(
partition);
if (deductReservedFromPending) {
pending = Resources.subtract(pending,
app.getAppAttemptResourceUsage().getReserved(partition));
}
pending = Resources.componentwiseMax(pending, Resources.none());
Resource minpendingConsideringUserLimit = Resources.componentwiseMin(
userNameToHeadroom.get(userName),
app.getAppAttemptResourceUsage().getPending(partition));
Resources.addTo(pendingConsideringUserLimit,
userNameToHeadroom.get(userName), pending);
Resources.addTo(totalPendingConsideringUserLimit,
minpendingConsideringUserLimit);
Resources.subtractFrom(userNameToHeadroom.get(userName),
minpendingConsideringUserLimit);
}
return pendingConsideringUserLimit;
return totalPendingConsideringUserLimit;
} finally {
readLock.unlock();
}

View File

@ -600,19 +600,21 @@ public class ProportionalCapacityPreemptionPolicyMockFramework {
qc.setUsedCapacity(partitionName, used);
when(queue.getUsedCapacity()).thenReturn(used);
ru.setPending(partitionName, pending);
if (!isParent(queueExprArray, idx)) {
LeafQueue lq = (LeafQueue) queue;
when(lq.getTotalPendingResourcesConsideringUserLimit(isA(Resource.class),
isA(String.class))).thenReturn(pending);
}
ru.setUsed(partitionName, parseResourceFromString(values[2].trim()));
// Setup reserved resource if it contained by input config
Resource reserved = Resources.none();
if(values.length == 5) {
reserved = parseResourceFromString(values[4].trim());
ru.setReserved(partitionName, reserved);
}
if (!isParent(queueExprArray, idx)) {
LeafQueue lq = (LeafQueue) queue;
when(lq.getTotalPendingResourcesConsideringUserLimit(isA(Resource.class),
isA(String.class), eq(false))).thenReturn(pending);
when(lq.getTotalPendingResourcesConsideringUserLimit(isA(Resource.class),
isA(String.class), eq(true))).thenReturn(
Resources.subtract(pending, reserved));
}
ru.setUsed(partitionName, parseResourceFromString(values[2].trim()));
LOG.debug("Setup queue=" + queueName + " partition=" + partitionName
+ " [abs_guaranteed=" + absGuaranteed + ",abs_max=" + absMax

View File

@ -84,6 +84,7 @@ import static org.junit.Assert.fail;
import static org.mockito.Matchers.any;
import static org.mockito.Matchers.anyString;
import static org.mockito.Matchers.argThat;
import static org.mockito.Matchers.eq;
import static org.mockito.Matchers.isA;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.never;
@ -988,7 +989,7 @@ public class TestProportionalCapacityPreemptionPolicy {
ProportionalCapacityPreemptionPolicy policy = buildPolicy(qData);
policy.editSchedule();
verify(mDisp, times(10)).handle(argThat(new IsPreemptionRequestFor(appC)));
verify(mDisp, times(9)).handle(argThat(new IsPreemptionRequestFor(appC)));
assertEquals(10, policy.getQueuePartitions().get("queueE")
.get("").preemptableExtra.getMemorySize());
//2nd level child(E) preempts 10, but parent A has only 9 extra
@ -1003,6 +1004,31 @@ public class TestProportionalCapacityPreemptionPolicy {
tempQueueAPartition.preemptableExtra.getMemorySize());
}
@Test
public void testPreemptionNotHappenForSingleReservedQueue() {
/*
* Test case to make sure, when reserved > pending, preemption will not
* happen if there's only one demanding queue.
*/
int[][] qData = new int[][]{
// / A B C
{ 100, 40, 40, 20 }, // abs
{ 100, 100, 100, 100 }, // maxCap
{ 100, 70, 0, 0 }, // used
{ 10, 30, 0, 0 }, // pending
{ 0, 50, 0, 0 }, // reserved
{ 1, 1, 0, 0 }, // apps
{ -1, 1, 1, 1 }, // req granularity
{ 3, 0, 0, 0 }, // subqueues
};
ProportionalCapacityPreemptionPolicy policy = buildPolicy(qData);
policy.editSchedule();
// No preemption happens
verify(mDisp, never()).handle(argThat(new IsPreemptionRequestFor(appA)));
}
static class IsPreemptionRequestFor
extends ArgumentMatcher<ContainerPreemptEvent> {
private final ApplicationAttemptId appAttId;
@ -1223,7 +1249,14 @@ public class TestProportionalCapacityPreemptionPolicy {
List<ApplicationAttemptId> appAttemptIdList =
new ArrayList<ApplicationAttemptId>();
when(lq.getTotalPendingResourcesConsideringUserLimit(isA(Resource.class),
isA(String.class))).thenReturn(pending[i]);
isA(String.class), eq(false))).thenReturn(pending[i]);
when(lq.getTotalPendingResourcesConsideringUserLimit(isA(Resource.class),
isA(String.class), eq(true))).thenReturn(Resources.componentwiseMax(
Resources.subtract(pending[i],
reserved[i] == null ? Resources.none() : reserved[i]),
Resources.none()));
// need to set pending resource in resource usage as well
ResourceUsage ru = new ResourceUsage();
ru.setPending(pending[i]);
@ -1359,27 +1392,4 @@ public class TestProportionalCapacityPreemptionPolicy {
return ret;
}
void printString(CSQueue nq, String indent) {
if (nq instanceof ParentQueue) {
System.out.println(indent + nq.getQueueName()
+ " cur:" + nq.getAbsoluteUsedCapacity()
+ " guar:" + nq.getAbsoluteCapacity()
);
for (CSQueue q : ((ParentQueue)nq).getChildQueues()) {
printString(q, indent + " ");
}
} else {
System.out.println(indent + nq.getQueueName()
+ " pen:"
+ ((LeafQueue) nq).getTotalPendingResourcesConsideringUserLimit(
isA(Resource.class), isA(String.class))
+ " cur:" + nq.getAbsoluteUsedCapacity()
+ " guar:" + nq.getAbsoluteCapacity()
);
for (FiCaSchedulerApp a : ((LeafQueue)nq).getApplications()) {
System.out.println(indent + " " + a.getApplicationId());
}
}
}
}

View File

@ -56,6 +56,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeRemoved
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeUpdateSchedulerEvent;
import org.apache.hadoop.yarn.server.resourcemanager.security.RMContainerTokenSecretManager;
import org.apache.hadoop.yarn.server.utils.BuilderUtils;
import org.apache.hadoop.yarn.util.resource.Resources;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
@ -691,4 +692,88 @@ public class TestContainerAllocation {
rm1.close();
}
@Test
public void testPendingResourcesConsideringUserLimit() throws Exception {
// Set maximum capacity of A to 10
CapacitySchedulerConfiguration newConf = new CapacitySchedulerConfiguration(
conf);
newConf.setUserLimitFactor(CapacitySchedulerConfiguration.ROOT + ".default",
0.5f);
newConf.setMaximumAMResourcePercentPerPartition(
CapacitySchedulerConfiguration.ROOT + ".default", "", 1.0f);
MockRM rm1 = new MockRM(newConf);
rm1.getRMContext().setNodeLabelManager(mgr);
rm1.start();
MockNM nm1 = rm1.registerNode("h1:1234", 8 * GB);
MockNM nm2 = rm1.registerNode("h2:1234", 8 * GB);
// launch an app to queue default, AM container should be launched in nm1
RMApp app1 = rm1.submitApp(2 * GB, "app", "u1", null, "default");
MockAM am1 = MockRM.launchAndRegisterAM(app1, rm1, nm1);
// launch 2nd app to queue default, AM container should be launched in nm1
RMApp app2 = rm1.submitApp(4 * GB, "app", "u2", null, "default");
MockAM am2 = MockRM.launchAndRegisterAM(app2, rm1, nm1);
// am1 asks 1 * 3G container
am1.allocate("*", 3 * GB, 1, null);
// am2 asks 4 * 5G container
am2.allocate("*", 5 * GB, 4, null);
CapacityScheduler cs = (CapacityScheduler) rm1.getResourceScheduler();
RMNode rmNode1 = rm1.getRMContext().getRMNodes().get(nm1.getNodeId());
RMNode rmNode2 = rm1.getRMContext().getRMNodes().get(nm2.getNodeId());
// Do node heartbeats one, we expect one container allocated reserved on nm1
cs.handle(new NodeUpdateSchedulerEvent(rmNode1));
FiCaSchedulerApp schedulerApp1 =
cs.getApplicationAttempt(am1.getApplicationAttemptId());
// App1 will get 1 container reserved
Assert.assertEquals(1, schedulerApp1.getReservedContainers().size());
/*
* Note that the behavior of appAttemptResourceUsage is different from queue's
* For queue, used = actual-used + reserved
* For app, used = actual-used.
*
* TODO (wangda): Need to make behaviors of queue/app's resource usage
* consistent
*/
Assert.assertEquals(2 * GB,
schedulerApp1.getAppAttemptResourceUsage().getUsed().getMemorySize());
Assert.assertEquals(3 * GB,
schedulerApp1.getAppAttemptResourceUsage().getReserved()
.getMemorySize());
Assert.assertEquals(3 * GB,
schedulerApp1.getAppAttemptResourceUsage().getPending()
.getMemorySize());
FiCaSchedulerApp schedulerApp2 =
cs.getApplicationAttempt(am2.getApplicationAttemptId());
Assert.assertEquals(4 * GB,
schedulerApp2.getAppAttemptResourceUsage().getUsed().getMemorySize());
Assert.assertEquals(0 * GB,
schedulerApp2.getAppAttemptResourceUsage().getReserved()
.getMemorySize());
Assert.assertEquals(5 * 4 * GB,
schedulerApp2.getAppAttemptResourceUsage().getPending()
.getMemorySize());
LeafQueue lq = (LeafQueue) cs.getQueue("default");
// UL = 8GB, so head room of u1 = 8GB - 2GB (AM) - 3GB (Reserved) = 3GB
// u2 = 8GB - 4GB = 4GB
// When not deduct reserved, total-pending = 3G (u1) + 4G (u2) = 7G
// deduct reserved, total-pending = 0G (u1) + 4G = 4G
Assert.assertEquals(7 * GB, lq.getTotalPendingResourcesConsideringUserLimit(
Resources.createResource(20 * GB), "", false).getMemorySize());
Assert.assertEquals(4 * GB, lq.getTotalPendingResourcesConsideringUserLimit(
Resources.createResource(20 * GB), "", true).getMemorySize());
rm1.close();
}
}

View File

@ -3280,8 +3280,9 @@ public class TestLeafQueue {
// even though user_0's apps are still asking for a total of 4GB.
assertEquals(1*GB, app_0.getCurrentConsumption().getMemorySize());
assertEquals(0*GB, app_1.getCurrentConsumption().getMemorySize());
assertEquals(0*GB, e.getTotalPendingResourcesConsideringUserLimit(
clusterResource, RMNodeLabelsManager.NO_LABEL).getMemorySize());
assertEquals(0 * GB,
e.getTotalPendingResourcesConsideringUserLimit(clusterResource,
RMNodeLabelsManager.NO_LABEL, false).getMemorySize());
// Assign 2nd container of 1GB
applyCSAssignment(clusterResource,
@ -3295,7 +3296,7 @@ public class TestLeafQueue {
assertEquals(2*GB, app_0.getCurrentConsumption().getMemorySize());
assertEquals(0*GB, app_1.getCurrentConsumption().getMemorySize());
assertEquals(0*GB, e.getTotalPendingResourcesConsideringUserLimit(
clusterResource, RMNodeLabelsManager.NO_LABEL).getMemorySize());
clusterResource, RMNodeLabelsManager.NO_LABEL, false).getMemorySize());
// Can't allocate 3rd container due to user-limit. Headroom still 0.
applyCSAssignment(clusterResource,
@ -3305,7 +3306,7 @@ public class TestLeafQueue {
assertEquals(2*GB, app_0.getCurrentConsumption().getMemorySize());
assertEquals(0*GB, app_1.getCurrentConsumption().getMemorySize());
assertEquals(0*GB, e.getTotalPendingResourcesConsideringUserLimit(
clusterResource, RMNodeLabelsManager.NO_LABEL).getMemorySize());
clusterResource, RMNodeLabelsManager.NO_LABEL, false).getMemorySize());
// Increase user-limit-factor from 1GB to 10GB (1% * 10 * 100GB = 10GB).
// Pending for both app_0 and app_1 are still 3GB, so user-limit-factor
@ -3313,7 +3314,7 @@ public class TestLeafQueue {
// getTotalPendingResourcesConsideringUserLimit()
e.setUserLimitFactor(10.0f);
assertEquals(3*GB, e.getTotalPendingResourcesConsideringUserLimit(
clusterResource, RMNodeLabelsManager.NO_LABEL).getMemorySize());
clusterResource, RMNodeLabelsManager.NO_LABEL, false).getMemorySize());
applyCSAssignment(clusterResource,
e.assignContainers(clusterResource, node_0,
@ -3323,7 +3324,7 @@ public class TestLeafQueue {
assertEquals(3*GB, app_0.getCurrentConsumption().getMemorySize());
assertEquals(0*GB, app_1.getCurrentConsumption().getMemorySize());
assertEquals(2*GB, e.getTotalPendingResourcesConsideringUserLimit(
clusterResource, RMNodeLabelsManager.NO_LABEL).getMemorySize());
clusterResource, RMNodeLabelsManager.NO_LABEL, false).getMemorySize());
// Get the last 2 containers for app_1, no more pending requests.
applyCSAssignment(clusterResource,
@ -3337,7 +3338,7 @@ public class TestLeafQueue {
assertEquals(3*GB, app_0.getCurrentConsumption().getMemorySize());
assertEquals(2*GB, app_1.getCurrentConsumption().getMemorySize());
assertEquals(0*GB, e.getTotalPendingResourcesConsideringUserLimit(
clusterResource, RMNodeLabelsManager.NO_LABEL).getMemorySize());
clusterResource, RMNodeLabelsManager.NO_LABEL, false).getMemorySize());
// Release each container from app_0
for (RMContainer rmContainer : app_0.getLiveContainers()) {
@ -3446,7 +3447,7 @@ public class TestLeafQueue {
// With queue capacity set at 1% of 100GB and user-limit-factor set to 1.0,
// queue 'e' should be able to consume 1GB per user.
assertEquals(2*GB, e.getTotalPendingResourcesConsideringUserLimit(
clusterResource, RMNodeLabelsManager.NO_LABEL).getMemorySize());
clusterResource, RMNodeLabelsManager.NO_LABEL, false).getMemorySize());
// None of the apps have assigned resources
// user_0's apps:
assertEquals(0*GB, app_0.getCurrentConsumption().getMemorySize());
@ -3463,7 +3464,7 @@ public class TestLeafQueue {
// The first container was assigned to user_0's app_0. Queues total headroom
// has 1GB left for user_1.
assertEquals(1*GB, e.getTotalPendingResourcesConsideringUserLimit(
clusterResource, RMNodeLabelsManager.NO_LABEL).getMemorySize());
clusterResource, RMNodeLabelsManager.NO_LABEL, false).getMemorySize());
// user_0's apps:
assertEquals(1*GB, app_0.getCurrentConsumption().getMemorySize());
assertEquals(0*GB, app_1.getCurrentConsumption().getMemorySize());
@ -3481,7 +3482,7 @@ public class TestLeafQueue {
// this container went to user_0's app_1. so, headroom for queue 'e'e is
// still 1GB for user_1
assertEquals(1*GB, e.getTotalPendingResourcesConsideringUserLimit(
clusterResource, RMNodeLabelsManager.NO_LABEL).getMemorySize());
clusterResource, RMNodeLabelsManager.NO_LABEL, false).getMemorySize());
// user_0's apps:
assertEquals(1*GB, app_0.getCurrentConsumption().getMemorySize());
assertEquals(1*GB, app_1.getCurrentConsumption().getMemorySize());
@ -3497,7 +3498,7 @@ public class TestLeafQueue {
// Container was allocated to user_1's app_2 since user_1, Now, no headroom
// is left.
assertEquals(0*GB, e.getTotalPendingResourcesConsideringUserLimit(
clusterResource, RMNodeLabelsManager.NO_LABEL).getMemorySize());
clusterResource, RMNodeLabelsManager.NO_LABEL, false).getMemorySize());
// user_0's apps:
assertEquals(1*GB, app_0.getCurrentConsumption().getMemorySize());
assertEquals(1*GB, app_1.getCurrentConsumption().getMemorySize());
@ -3513,7 +3514,7 @@ public class TestLeafQueue {
// Allocated to user_1's app_2 since scheduler allocates 1 container
// above user resource limit. Available headroom still 0.
assertEquals(0*GB, e.getTotalPendingResourcesConsideringUserLimit(
clusterResource, RMNodeLabelsManager.NO_LABEL).getMemorySize());
clusterResource, RMNodeLabelsManager.NO_LABEL, false).getMemorySize());
// user_0's apps:
long app_0_consumption = app_0.getCurrentConsumption().getMemorySize();
assertEquals(1*GB, app_0_consumption);
@ -3533,7 +3534,7 @@ public class TestLeafQueue {
// Cannot allocate 5th container because both users are above their allowed
// user resource limit. Values should be the same as previously.
assertEquals(0*GB, e.getTotalPendingResourcesConsideringUserLimit(
clusterResource, RMNodeLabelsManager.NO_LABEL).getMemorySize());
clusterResource, RMNodeLabelsManager.NO_LABEL, false).getMemorySize());
// user_0's apps:
assertEquals(app_0_consumption, app_0.getCurrentConsumption().getMemorySize());
assertEquals(app_1_consumption, app_1.getCurrentConsumption().getMemorySize());
@ -3552,7 +3553,7 @@ public class TestLeafQueue {
SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY), e, nodes, apps);
// Next container goes to user_0's app_1, since it still wanted 1GB.
assertEquals(1*GB, e.getTotalPendingResourcesConsideringUserLimit(
clusterResource, RMNodeLabelsManager.NO_LABEL).getMemorySize());
clusterResource, RMNodeLabelsManager.NO_LABEL, false).getMemorySize());
// user_0's apps:
assertEquals(1*GB, app_0.getCurrentConsumption().getMemorySize());
assertEquals(2*GB, app_1.getCurrentConsumption().getMemorySize());
@ -3567,7 +3568,7 @@ public class TestLeafQueue {
// Last container goes to user_1's app_3, since it still wanted 1GB.
// user_0's apps:
assertEquals(0*GB, e.getTotalPendingResourcesConsideringUserLimit(
clusterResource, RMNodeLabelsManager.NO_LABEL).getMemorySize());
clusterResource, RMNodeLabelsManager.NO_LABEL, false).getMemorySize());
assertEquals(1*GB, app_0.getCurrentConsumption().getMemorySize());
assertEquals(2*GB, app_1.getCurrentConsumption().getMemorySize());
// user_1's apps: