YARN-7496. CS Intra-queue preemption user-limit calculations are not in line with LeafQueue user-limit calculations. (Eric Payne via wangda)
Change-Id: I4dada78a227408a1f2d9bc18099041aad81d67d7
This commit is contained in:
parent
a9f0536ced
commit
dd47142565
|
@ -142,7 +142,6 @@ public class LeafQueue extends AbstractCSQueue {
|
||||||
private Set<String> activeUsersSet =
|
private Set<String> activeUsersSet =
|
||||||
Collections.newSetFromMap(new ConcurrentHashMap<String, Boolean>());
|
Collections.newSetFromMap(new ConcurrentHashMap<String, Boolean>());
|
||||||
private float activeUsersTimesWeights = 0.0f;
|
private float activeUsersTimesWeights = 0.0f;
|
||||||
private float allUsersTimesWeights = 0.0f;
|
|
||||||
|
|
||||||
@SuppressWarnings({ "unchecked", "rawtypes" })
|
@SuppressWarnings({ "unchecked", "rawtypes" })
|
||||||
public LeafQueue(CapacitySchedulerContext cs,
|
public LeafQueue(CapacitySchedulerContext cs,
|
||||||
|
@ -307,7 +306,6 @@ public class LeafQueue extends AbstractCSQueue {
|
||||||
ue.getValue().setWeight(getUserWeightFromQueue(ue.getKey()));
|
ue.getValue().setWeight(getUserWeightFromQueue(ue.getKey()));
|
||||||
}
|
}
|
||||||
activeUsersTimesWeights = sumActiveUsersTimesWeights();
|
activeUsersTimesWeights = sumActiveUsersTimesWeights();
|
||||||
allUsersTimesWeights = sumAllUsersTimesWeights();
|
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -450,7 +448,6 @@ public class LeafQueue extends AbstractCSQueue {
|
||||||
user = new User(userName);
|
user = new User(userName);
|
||||||
users.put(userName, user);
|
users.put(userName, user);
|
||||||
user.setWeight(getUserWeightFromQueue(userName));
|
user.setWeight(getUserWeightFromQueue(userName));
|
||||||
allUsersTimesWeights = sumAllUsersTimesWeights();
|
|
||||||
}
|
}
|
||||||
return user;
|
return user;
|
||||||
}
|
}
|
||||||
|
@ -871,7 +868,6 @@ public class LeafQueue extends AbstractCSQueue {
|
||||||
user.finishApplication(wasActive);
|
user.finishApplication(wasActive);
|
||||||
if (user.getTotalApplications() == 0) {
|
if (user.getTotalApplications() == 0) {
|
||||||
users.remove(application.getUser());
|
users.remove(application.getUser());
|
||||||
allUsersTimesWeights = sumAllUsersTimesWeights();
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// Check if we can activate more applications
|
// Check if we can activate more applications
|
||||||
|
@ -1298,16 +1294,18 @@ public class LeafQueue extends AbstractCSQueue {
|
||||||
// Also, the queue's configured capacity should be higher than
|
// Also, the queue's configured capacity should be higher than
|
||||||
// queue-hard-limit * ulMin
|
// queue-hard-limit * ulMin
|
||||||
|
|
||||||
float usersSummedByWeight;
|
|
||||||
if (forActive) {
|
|
||||||
if (activeUsersManager.getActiveUsersChanged()) {
|
if (activeUsersManager.getActiveUsersChanged()) {
|
||||||
activeUsersSet = activeUsersManager.getActiveUsersSet();
|
activeUsersSet = activeUsersManager.getActiveUsersSet();
|
||||||
activeUsersTimesWeights = sumActiveUsersTimesWeights();
|
activeUsersTimesWeights = sumActiveUsersTimesWeights();
|
||||||
activeUsersManager.clearActiveUsersChanged();
|
activeUsersManager.clearActiveUsersChanged();
|
||||||
}
|
}
|
||||||
usersSummedByWeight = activeUsersTimesWeights;
|
float usersSummedByWeight = activeUsersTimesWeights;
|
||||||
} else {
|
|
||||||
usersSummedByWeight = allUsersTimesWeights;
|
// Align the preemption algorithm with the assignment algorithm.
|
||||||
|
// If calculating for preemption and the user is not active, calculate the
|
||||||
|
// limit as if the user will be preempted (since that will make it active).
|
||||||
|
if (!forActive && !activeUsersSet.contains(userName)) {
|
||||||
|
usersSummedByWeight = activeUsersTimesWeights + user.getWeight();
|
||||||
}
|
}
|
||||||
|
|
||||||
// User limit resource is determined by:
|
// User limit resource is determined by:
|
||||||
|
@ -1395,15 +1393,6 @@ public class LeafQueue extends AbstractCSQueue {
|
||||||
return count;
|
return count;
|
||||||
}
|
}
|
||||||
|
|
||||||
synchronized float sumAllUsersTimesWeights() {
|
|
||||||
float count = 0.0f;
|
|
||||||
for (String userName : users.keySet()) {
|
|
||||||
User user = getUser(userName);
|
|
||||||
count += user.getWeight();
|
|
||||||
}
|
|
||||||
return count;
|
|
||||||
}
|
|
||||||
|
|
||||||
@Private
|
@Private
|
||||||
protected synchronized boolean canAssignToUser(Resource clusterResource,
|
protected synchronized boolean canAssignToUser(Resource clusterResource,
|
||||||
String userName, Resource limit, Resource rsrv,
|
String userName, Resource limit, Resource rsrv,
|
||||||
|
|
|
@ -931,4 +931,46 @@ public class TestProportionalCapacityPreemptionPolicyIntraQueueUserLimit
|
||||||
new TestProportionalCapacityPreemptionPolicy.IsPreemptionRequestFor(
|
new TestProportionalCapacityPreemptionPolicy.IsPreemptionRequestFor(
|
||||||
getAppAttemptId(1))));
|
getAppAttemptId(1))));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testSimpleIUserLimitntraQueuePreemptionNoOverPreemption()
|
||||||
|
throws IOException {
|
||||||
|
conf.setFloat(CapacitySchedulerConfiguration.
|
||||||
|
INTRAQUEUE_PREEMPTION_MAX_ALLOWABLE_LIMIT,
|
||||||
|
(float) 0.5);
|
||||||
|
|
||||||
|
String labelsConfig = "=100,true;";
|
||||||
|
String nodesConfig = // n1 has no label
|
||||||
|
"n1= res=100";
|
||||||
|
String queuesConfig =
|
||||||
|
// guaranteed,max,used,pending,reserved
|
||||||
|
"root(=[100 100 100 1 0]);" + // root
|
||||||
|
"-a(=[100 100 100 1 0])"; // a
|
||||||
|
|
||||||
|
// When preempting within a queue, it should preempt no more than what the
|
||||||
|
// LeafQueue will then re-assign. In this use case, containers will be
|
||||||
|
// preempted from app1, which will cause app1 and app2 to be active.
|
||||||
|
// LeafQueue will calculate MULP to be 50% because
|
||||||
|
// (100 resources / 2 active users) = 50. We expect 14 containers to be
|
||||||
|
// preempted.
|
||||||
|
// queueName\t(priority,resource,host,expression,#repeat,reserved,pending)
|
||||||
|
// \tMULP
|
||||||
|
String appsConfig =
|
||||||
|
"a\t" // app1 in a
|
||||||
|
+ "(1,1,n1,,65,false,0,user1)\t50;" +
|
||||||
|
"a\t" // app2 in a
|
||||||
|
+ "(1,1,n1,,35,false,0,user2)\t50;" +
|
||||||
|
"a\t" // app3 in a
|
||||||
|
+ "(1,1,n1,,0,false,35,user3)\t50"
|
||||||
|
;
|
||||||
|
|
||||||
|
buildEnv(labelsConfig, nodesConfig, queuesConfig, appsConfig);
|
||||||
|
policy.editSchedule();
|
||||||
|
|
||||||
|
// app2 is right at its user limit and app1 needs one resource. Should
|
||||||
|
// preempt 1 container.
|
||||||
|
verify(mDisp, times(14)).handle(argThat(
|
||||||
|
new TestProportionalCapacityPreemptionPolicy.IsPreemptionRequestFor(
|
||||||
|
getAppAttemptId(1))));
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -3503,4 +3503,101 @@ public class TestLeafQueue {
|
||||||
assertEquals(1*GB,
|
assertEquals(1*GB,
|
||||||
app_1.getTotalPendingRequestsPerPartition().get("").getMemorySize());
|
app_1.getTotalPendingRequestsPerPartition().get("").getMemorySize());
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testGetResourceLimitForAllUsers() throws Exception {
|
||||||
|
// Mock the queue
|
||||||
|
LeafQueue a = stubLeafQueue((LeafQueue)queues.get(A));
|
||||||
|
// Set minimum-user-limit-percent for queue "a" so 3 users can be active.
|
||||||
|
csConf.setUserLimit(a.getQueuePath(), 33);
|
||||||
|
// Make sure a single user can consume the entire cluster.
|
||||||
|
csConf.setUserLimitFactor(a.getQueuePath(), 15);
|
||||||
|
csConf.setMaximumCapacity(a.getQueuePath(), 100);
|
||||||
|
|
||||||
|
when(csContext.getClusterResource())
|
||||||
|
.thenReturn(Resources.createResource(100 * GB, 192));
|
||||||
|
a.reinitialize(a, csContext.getClusterResource());
|
||||||
|
|
||||||
|
// Users
|
||||||
|
final String user_0 = "user_0";
|
||||||
|
final String user_1 = "user_1";
|
||||||
|
final String user_2 = "user_2";
|
||||||
|
|
||||||
|
// Submit applications
|
||||||
|
final ApplicationAttemptId appAttemptId_0 =
|
||||||
|
TestUtils.getMockApplicationAttemptId(0, 0);
|
||||||
|
FiCaSchedulerApp app_0 =
|
||||||
|
new FiCaSchedulerApp(appAttemptId_0, user_0, a,
|
||||||
|
a.getActiveUsersManager(), spyRMContext);
|
||||||
|
a.submitApplicationAttempt(app_0, user_0);
|
||||||
|
|
||||||
|
final ApplicationAttemptId appAttemptId_1 =
|
||||||
|
TestUtils.getMockApplicationAttemptId(1, 0);
|
||||||
|
FiCaSchedulerApp app_1 =
|
||||||
|
new FiCaSchedulerApp(appAttemptId_1, user_1, a,
|
||||||
|
a.getActiveUsersManager(), spyRMContext);
|
||||||
|
a.submitApplicationAttempt(app_1, user_1); // different user
|
||||||
|
|
||||||
|
final ApplicationAttemptId appAttemptId_2 =
|
||||||
|
TestUtils.getMockApplicationAttemptId(2, 0);
|
||||||
|
FiCaSchedulerApp app_2 =
|
||||||
|
new FiCaSchedulerApp(appAttemptId_2, user_2, a,
|
||||||
|
a.getActiveUsersManager(), spyRMContext);
|
||||||
|
a.submitApplicationAttempt(app_2, user_2); // different user
|
||||||
|
|
||||||
|
// Setup some nodes
|
||||||
|
String host_0 = "127.0.0.1";
|
||||||
|
FiCaSchedulerNode node_0 = TestUtils.getMockNode(host_0, DEFAULT_RACK, 0, 100*GB);
|
||||||
|
|
||||||
|
final int numNodes = 1;
|
||||||
|
Resource clusterResource =
|
||||||
|
Resources.createResource(numNodes * (100*GB), numNodes * 128);
|
||||||
|
when(csContext.getNumClusterNodes()).thenReturn(numNodes);
|
||||||
|
|
||||||
|
// user_0 consumes 65% of the queue
|
||||||
|
Priority priority = TestUtils.createMockPriority(1);
|
||||||
|
for (int i=0; i < 65; i++) {
|
||||||
|
app_0.updateResourceRequests(Collections.singletonList(
|
||||||
|
TestUtils.createResourceRequest(ResourceRequest.ANY, 1*GB, 1, true,
|
||||||
|
priority, recordFactory)));
|
||||||
|
a.assignContainers(clusterResource, node_0,
|
||||||
|
new ResourceLimits(clusterResource),
|
||||||
|
SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY);
|
||||||
|
}
|
||||||
|
assertEquals(65*GB, app_0.getCurrentConsumption().getMemorySize());
|
||||||
|
|
||||||
|
// When the minimum user limit percent is set to 33%, the capacity scheduler
|
||||||
|
// will try to assign 35% of resources to each user. This is because the
|
||||||
|
// capacity scheduler leaves a slight buffer for each user.
|
||||||
|
for (int i=0; i < 35; i++) {
|
||||||
|
app_1.updateResourceRequests(Collections.singletonList(
|
||||||
|
TestUtils.createResourceRequest(ResourceRequest.ANY, 1*GB, 1, true,
|
||||||
|
priority, recordFactory)));
|
||||||
|
|
||||||
|
a.assignContainers(clusterResource, node_0,
|
||||||
|
new ResourceLimits(clusterResource),
|
||||||
|
SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY);
|
||||||
|
}
|
||||||
|
assertEquals(35*GB, app_1.getCurrentConsumption().getMemorySize());
|
||||||
|
assertEquals(0, a.getActiveUsersManager().getNumActiveUsers());
|
||||||
|
|
||||||
|
app_0.updateResourceRequests(Collections.singletonList(
|
||||||
|
TestUtils.createResourceRequest(ResourceRequest.ANY, 1*GB, 35, true,
|
||||||
|
priority, recordFactory)));
|
||||||
|
a.assignContainers(clusterResource, node_0,
|
||||||
|
new ResourceLimits(clusterResource),
|
||||||
|
SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY);
|
||||||
|
assertEquals(0*GB, app_2.getCurrentConsumption().getMemorySize());
|
||||||
|
assertEquals(1, a.getActiveUsersManager().getNumActiveUsers());
|
||||||
|
|
||||||
|
// With one active user requesting resources (user_2), one user exactly at
|
||||||
|
// the user limit guarantee (user_1) and one user over its user limit
|
||||||
|
// (uesr_0), preemption should calculate the user limit to be 50% of the
|
||||||
|
// resources. Since the capacity scheduler will leave a buffer of 1
|
||||||
|
// container, 51GB should be the amount of resources calculated for
|
||||||
|
// preemption.
|
||||||
|
Resource ulForallUsers = a.getResourceLimitForAllUsers(user_2,
|
||||||
|
clusterResource, "", SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY);
|
||||||
|
assertEquals(51*GB, ulForallUsers.getMemorySize());
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue