YARN-3769. Consider user limit when calculating total pending resource for preemption policy in Capacity Scheduler. (Eric Payne via wangda)
(cherry picked from commit 2346fa3141
)
This commit is contained in:
parent
f3bca8bc1a
commit
cdca460ad4
|
@ -1026,6 +1026,9 @@ Release 2.7.3 - UNRELEASED
|
||||||
YARN-4374. RM capacity scheduler UI rounds user limit factor (Chang Li via
|
YARN-4374. RM capacity scheduler UI rounds user limit factor (Chang Li via
|
||||||
jlowe)
|
jlowe)
|
||||||
|
|
||||||
|
YARN-3769. Consider user limit when calculating total pending resource for
|
||||||
|
preemption policy in Capacity Scheduler. (Eric Payne via wangda)
|
||||||
|
|
||||||
Release 2.7.2 - UNRELEASED
|
Release 2.7.2 - UNRELEASED
|
||||||
|
|
||||||
INCOMPATIBLE CHANGES
|
INCOMPATIBLE CHANGES
|
||||||
|
|
|
@ -872,7 +872,8 @@ private TempQueuePerPartition cloneQueues(CSQueue curQueue,
|
||||||
if (curQueue instanceof LeafQueue) {
|
if (curQueue instanceof LeafQueue) {
|
||||||
LeafQueue l = (LeafQueue) curQueue;
|
LeafQueue l = (LeafQueue) curQueue;
|
||||||
Resource pending =
|
Resource pending =
|
||||||
l.getQueueResourceUsage().getPending(partitionToLookAt);
|
l.getTotalPendingResourcesConsideringUserLimit(
|
||||||
|
partitionResource, partitionToLookAt);
|
||||||
ret = new TempQueuePerPartition(queueName, current, pending, guaranteed,
|
ret = new TempQueuePerPartition(queueName, current, pending, guaranteed,
|
||||||
maxCapacity, preemptionDisabled, partitionToLookAt);
|
maxCapacity, preemptionDisabled, partitionToLookAt);
|
||||||
if (preemptionDisabled) {
|
if (preemptionDisabled) {
|
||||||
|
|
|
@ -64,7 +64,6 @@
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp;
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerNode;
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerNode;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.policy.OrderingPolicy;
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.policy.OrderingPolicy;
|
||||||
import org.apache.hadoop.yarn.server.utils.BuilderUtils;
|
|
||||||
import org.apache.hadoop.yarn.server.utils.Lock;
|
import org.apache.hadoop.yarn.server.utils.Lock;
|
||||||
import org.apache.hadoop.yarn.server.utils.Lock.NoLock;
|
import org.apache.hadoop.yarn.server.utils.Lock.NoLock;
|
||||||
import org.apache.hadoop.yarn.util.resource.Resources;
|
import org.apache.hadoop.yarn.util.resource.Resources;
|
||||||
|
@ -1515,14 +1514,35 @@ public Collection<FiCaSchedulerApp> getApplications() {
|
||||||
return orderingPolicy.getSchedulableEntities();
|
return orderingPolicy.getSchedulableEntities();
|
||||||
}
|
}
|
||||||
|
|
||||||
// return a single Resource capturing the overal amount of pending resources
|
// Consider the headroom for each user in the queue.
|
||||||
public synchronized Resource getTotalResourcePending() {
|
// Total pending for the queue =
|
||||||
Resource ret = BuilderUtils.newResource(0, 0);
|
// sum(for each user(min((user's headroom), sum(user's pending requests))))
|
||||||
for (FiCaSchedulerApp f :
|
// NOTE: Used for calculating pedning resources in the preemption monitor.
|
||||||
orderingPolicy.getSchedulableEntities()) {
|
public synchronized Resource getTotalPendingResourcesConsideringUserLimit(
|
||||||
Resources.addTo(ret, f.getTotalPendingRequests());
|
Resource resources, String partition) {
|
||||||
|
Map<String, Resource> userNameToHeadroom = new HashMap<String, Resource>();
|
||||||
|
Resource pendingConsideringUserLimit = Resource.newInstance(0, 0);
|
||||||
|
for (FiCaSchedulerApp app : getApplications()) {
|
||||||
|
String userName = app.getUser();
|
||||||
|
if (!userNameToHeadroom.containsKey(userName)) {
|
||||||
|
User user = getUser(userName);
|
||||||
|
Resource headroom = Resources.subtract(
|
||||||
|
computeUserLimit(app, resources, user, partition,
|
||||||
|
SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY),
|
||||||
|
user.getUsed(partition));
|
||||||
|
// Make sure headroom is not negative.
|
||||||
|
headroom = Resources.componentwiseMax(headroom, Resources.none());
|
||||||
|
userNameToHeadroom.put(userName, headroom);
|
||||||
}
|
}
|
||||||
return ret;
|
Resource minpendingConsideringUserLimit =
|
||||||
|
Resources.componentwiseMin(userNameToHeadroom.get(userName),
|
||||||
|
app.getAppAttemptResourceUsage().getPending(partition));
|
||||||
|
Resources.addTo(pendingConsideringUserLimit,
|
||||||
|
minpendingConsideringUserLimit);
|
||||||
|
Resources.subtractFrom(
|
||||||
|
userNameToHeadroom.get(userName), minpendingConsideringUserLimit);
|
||||||
|
}
|
||||||
|
return pendingConsideringUserLimit;
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
|
|
@ -1150,7 +1150,8 @@ LeafQueue mockLeafQueue(ParentQueue p, Resource tot, int i, Resource[] abs,
|
||||||
ResourceCalculator rc = mCS.getResourceCalculator();
|
ResourceCalculator rc = mCS.getResourceCalculator();
|
||||||
List<ApplicationAttemptId> appAttemptIdList =
|
List<ApplicationAttemptId> appAttemptIdList =
|
||||||
new ArrayList<ApplicationAttemptId>();
|
new ArrayList<ApplicationAttemptId>();
|
||||||
when(lq.getTotalResourcePending()).thenReturn(pending[i]);
|
when(lq.getTotalPendingResourcesConsideringUserLimit(isA(Resource.class),
|
||||||
|
isA(String.class))).thenReturn(pending[i]);
|
||||||
// need to set pending resource in resource usage as well
|
// need to set pending resource in resource usage as well
|
||||||
ResourceUsage ru = new ResourceUsage();
|
ResourceUsage ru = new ResourceUsage();
|
||||||
ru.setPending(pending[i]);
|
ru.setPending(pending[i]);
|
||||||
|
@ -1292,7 +1293,9 @@ void printString(CSQueue nq, String indent) {
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
System.out.println(indent + nq.getQueueName()
|
System.out.println(indent + nq.getQueueName()
|
||||||
+ " pen:" + ((LeafQueue) nq).getTotalResourcePending()
|
+ " pen:"
|
||||||
|
+ ((LeafQueue) nq).getTotalPendingResourcesConsideringUserLimit(
|
||||||
|
isA(Resource.class), isA(String.class))
|
||||||
+ " cur:" + nq.getAbsoluteUsedCapacity()
|
+ " cur:" + nq.getAbsoluteUsedCapacity()
|
||||||
+ " guar:" + nq.getAbsoluteCapacity()
|
+ " guar:" + nq.getAbsoluteCapacity()
|
||||||
);
|
);
|
||||||
|
|
|
@ -25,6 +25,7 @@
|
||||||
import static org.mockito.Matchers.any;
|
import static org.mockito.Matchers.any;
|
||||||
import static org.mockito.Matchers.argThat;
|
import static org.mockito.Matchers.argThat;
|
||||||
import static org.mockito.Matchers.eq;
|
import static org.mockito.Matchers.eq;
|
||||||
|
import static org.mockito.Matchers.isA;
|
||||||
import static org.mockito.Mockito.doAnswer;
|
import static org.mockito.Mockito.doAnswer;
|
||||||
import static org.mockito.Mockito.mock;
|
import static org.mockito.Mockito.mock;
|
||||||
import static org.mockito.Mockito.never;
|
import static org.mockito.Mockito.never;
|
||||||
|
@ -1219,6 +1220,11 @@ private void setupQueue(CSQueue queue, String q, String[] queueExprArray,
|
||||||
qc.setAbsoluteMaximumCapacity(partitionName, absMax);
|
qc.setAbsoluteMaximumCapacity(partitionName, absMax);
|
||||||
qc.setAbsoluteUsedCapacity(partitionName, absUsed);
|
qc.setAbsoluteUsedCapacity(partitionName, absUsed);
|
||||||
ru.setPending(partitionName, pending);
|
ru.setPending(partitionName, pending);
|
||||||
|
if (!isParent(queueExprArray, idx)) {
|
||||||
|
LeafQueue lq = (LeafQueue) queue;
|
||||||
|
when(lq.getTotalPendingResourcesConsideringUserLimit(isA(Resource.class),
|
||||||
|
isA(String.class))).thenReturn(pending);
|
||||||
|
}
|
||||||
ru.setUsed(partitionName, parseResourceFromString(values[2].trim()));
|
ru.setUsed(partitionName, parseResourceFromString(values[2].trim()));
|
||||||
LOG.debug("Setup queue=" + queueName + " partition=" + partitionName
|
LOG.debug("Setup queue=" + queueName + " partition=" + partitionName
|
||||||
+ " [abs_guaranteed=" + absGuaranteed + ",abs_max=" + absMax
|
+ " [abs_guaranteed=" + absGuaranteed + ",abs_max=" + absMax
|
||||||
|
|
|
@ -61,6 +61,7 @@
|
||||||
import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
|
import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.MockRM;
|
import org.apache.hadoop.yarn.server.resourcemanager.MockRM;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
|
import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
|
||||||
|
import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
|
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppState;
|
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppState;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptState;
|
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptState;
|
||||||
|
@ -89,6 +90,7 @@
|
||||||
import org.junit.Test;
|
import org.junit.Test;
|
||||||
import org.mockito.Matchers;
|
import org.mockito.Matchers;
|
||||||
import org.mockito.Mockito;
|
import org.mockito.Mockito;
|
||||||
|
import org.mortbay.log.Log;
|
||||||
|
|
||||||
public class TestLeafQueue {
|
public class TestLeafQueue {
|
||||||
private final RecordFactory recordFactory =
|
private final RecordFactory recordFactory =
|
||||||
|
@ -2705,6 +2707,363 @@ public void testLocalityDelaySkipsApplication() throws Exception {
|
||||||
assertEquals(1, app_1.getLiveContainers().size());
|
assertEquals(1, app_1.getLiveContainers().size());
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testGetTotalPendingResourcesConsideringUserLimitOneUser()
|
||||||
|
throws Exception {
|
||||||
|
// Manipulate queue 'e'
|
||||||
|
LeafQueue e = stubLeafQueue((LeafQueue)queues.get(E));
|
||||||
|
// Allow queue 'e' to use 100% of cluster resources (max capacity).
|
||||||
|
e.setMaxCapacity(1.0f);
|
||||||
|
// When used queue resources goes above capacity (in this case, 1%), user
|
||||||
|
// resource limit (used in calculating headroom) is calculated in small
|
||||||
|
// increments to ensure that user-limit-percent can be met for all users in
|
||||||
|
// a queue. Take user-limit-percent out of the equation so that user
|
||||||
|
// resource limit will always be calculated to its max possible value.
|
||||||
|
e.setUserLimit(1000);
|
||||||
|
|
||||||
|
final String user_0 = "user_0";
|
||||||
|
|
||||||
|
// Submit 2 applications for user_0
|
||||||
|
final ApplicationAttemptId appAttemptId_0 =
|
||||||
|
TestUtils.getMockApplicationAttemptId(0, 0);
|
||||||
|
FiCaSchedulerApp app_0 =
|
||||||
|
new FiCaSchedulerApp(appAttemptId_0, user_0, e,
|
||||||
|
mock(ActiveUsersManager.class), spyRMContext);
|
||||||
|
e.submitApplicationAttempt(app_0, user_0);
|
||||||
|
|
||||||
|
final ApplicationAttemptId appAttemptId_1 =
|
||||||
|
TestUtils.getMockApplicationAttemptId(1, 0);
|
||||||
|
FiCaSchedulerApp app_1 =
|
||||||
|
new FiCaSchedulerApp(appAttemptId_1, user_0, e,
|
||||||
|
mock(ActiveUsersManager.class), spyRMContext);
|
||||||
|
e.submitApplicationAttempt(app_1, user_0); // same user
|
||||||
|
|
||||||
|
// Setup 1 node with 100GB of memory resources.
|
||||||
|
String host_0 = "127.0.0.1";
|
||||||
|
FiCaSchedulerNode node_0 = TestUtils.getMockNode(host_0, DEFAULT_RACK, 0,
|
||||||
|
100*GB);
|
||||||
|
|
||||||
|
final int numNodes = 1;
|
||||||
|
Resource clusterResource =
|
||||||
|
Resources.createResource(numNodes * (100*GB), numNodes * 128);
|
||||||
|
when(csContext.getNumClusterNodes()).thenReturn(numNodes);
|
||||||
|
|
||||||
|
// Pending resource requests for app_0 and app_1 total 5GB.
|
||||||
|
Priority priority = TestUtils.createMockPriority(1);
|
||||||
|
app_0.updateResourceRequests(Collections.singletonList(
|
||||||
|
TestUtils.createResourceRequest(ResourceRequest.ANY, 1*GB, 3, true,
|
||||||
|
priority, recordFactory)));
|
||||||
|
|
||||||
|
app_1.updateResourceRequests(Collections.singletonList(
|
||||||
|
TestUtils.createResourceRequest(ResourceRequest.ANY, 1*GB, 2, true,
|
||||||
|
priority, recordFactory)));
|
||||||
|
|
||||||
|
// Start testing...
|
||||||
|
|
||||||
|
// Assign 1st Container of 1GB
|
||||||
|
e.assignContainers(clusterResource, node_0,
|
||||||
|
new ResourceLimits(clusterResource),
|
||||||
|
SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY);
|
||||||
|
// With queue capacity set at 1% of 100GB and user-limit-factor set to 1.0,
|
||||||
|
// all users (only user_0) queue 'e' should be able to consume 1GB.
|
||||||
|
// The first container should be assigned to app_0 with no headroom left
|
||||||
|
// even though user_0's apps are still asking for a total of 4GB.
|
||||||
|
assertEquals(1*GB, app_0.getCurrentConsumption().getMemory());
|
||||||
|
assertEquals(0*GB, app_1.getCurrentConsumption().getMemory());
|
||||||
|
assertEquals(0*GB, e.getTotalPendingResourcesConsideringUserLimit(
|
||||||
|
clusterResource, RMNodeLabelsManager.NO_LABEL).getMemory());
|
||||||
|
|
||||||
|
// Assign 2nd container of 1GB
|
||||||
|
e.assignContainers(clusterResource, node_0,
|
||||||
|
new ResourceLimits(clusterResource),
|
||||||
|
SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY);
|
||||||
|
// user_0 has no headroom due to user-limit-factor of 1.0. However capacity
|
||||||
|
// scheduler will assign one container more than user-limit-factor.
|
||||||
|
// This container also went to app_0. Still with no neadroom even though
|
||||||
|
// app_0 and app_1 are asking for a cumulative 3GB.
|
||||||
|
assertEquals(2*GB, app_0.getCurrentConsumption().getMemory());
|
||||||
|
assertEquals(0*GB, app_1.getCurrentConsumption().getMemory());
|
||||||
|
assertEquals(0*GB, e.getTotalPendingResourcesConsideringUserLimit(
|
||||||
|
clusterResource, RMNodeLabelsManager.NO_LABEL).getMemory());
|
||||||
|
|
||||||
|
// Can't allocate 3rd container due to user-limit. Headroom still 0.
|
||||||
|
e.assignContainers(clusterResource, node_0,
|
||||||
|
new ResourceLimits(clusterResource),
|
||||||
|
SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY);
|
||||||
|
assertEquals(2*GB, app_0.getCurrentConsumption().getMemory());
|
||||||
|
assertEquals(0*GB, app_1.getCurrentConsumption().getMemory());
|
||||||
|
assertEquals(0*GB, e.getTotalPendingResourcesConsideringUserLimit(
|
||||||
|
clusterResource, RMNodeLabelsManager.NO_LABEL).getMemory());
|
||||||
|
|
||||||
|
// Increase user-limit-factor from 1GB to 10GB (1% * 10 * 100GB = 10GB).
|
||||||
|
// Pending for both app_0 and app_1 are still 3GB, so user-limit-factor
|
||||||
|
// is no longer limiting the return value of
|
||||||
|
// getTotalPendingResourcesConsideringUserLimit()
|
||||||
|
e.setUserLimitFactor(10.0f);
|
||||||
|
assertEquals(3*GB, e.getTotalPendingResourcesConsideringUserLimit(
|
||||||
|
clusterResource, RMNodeLabelsManager.NO_LABEL).getMemory());
|
||||||
|
|
||||||
|
e.assignContainers(clusterResource, node_0,
|
||||||
|
new ResourceLimits(clusterResource),
|
||||||
|
SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY);
|
||||||
|
// app_0 is now satisified, app_1 is still asking for 2GB.
|
||||||
|
assertEquals(3*GB, app_0.getCurrentConsumption().getMemory());
|
||||||
|
assertEquals(0*GB, app_1.getCurrentConsumption().getMemory());
|
||||||
|
assertEquals(2*GB, e.getTotalPendingResourcesConsideringUserLimit(
|
||||||
|
clusterResource, RMNodeLabelsManager.NO_LABEL).getMemory());
|
||||||
|
|
||||||
|
// Get the last 2 containers for app_1, no more pending requests.
|
||||||
|
e.assignContainers(clusterResource, node_0,
|
||||||
|
new ResourceLimits(clusterResource),
|
||||||
|
SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY);
|
||||||
|
e.assignContainers(clusterResource, node_0,
|
||||||
|
new ResourceLimits(clusterResource),
|
||||||
|
SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY);
|
||||||
|
assertEquals(3*GB, app_0.getCurrentConsumption().getMemory());
|
||||||
|
assertEquals(2*GB, app_1.getCurrentConsumption().getMemory());
|
||||||
|
assertEquals(0*GB, e.getTotalPendingResourcesConsideringUserLimit(
|
||||||
|
clusterResource, RMNodeLabelsManager.NO_LABEL).getMemory());
|
||||||
|
|
||||||
|
// Release each container from app_0
|
||||||
|
for (RMContainer rmContainer : app_0.getLiveContainers()) {
|
||||||
|
e.completedContainer(clusterResource, app_0, node_0, rmContainer,
|
||||||
|
ContainerStatus.newInstance(rmContainer.getContainerId(),
|
||||||
|
ContainerState.COMPLETE, "",
|
||||||
|
ContainerExitStatus.KILLED_BY_RESOURCEMANAGER),
|
||||||
|
RMContainerEventType.KILL, null, true);
|
||||||
|
}
|
||||||
|
|
||||||
|
// Release each container from app_1
|
||||||
|
for (RMContainer rmContainer : app_1.getLiveContainers()) {
|
||||||
|
e.completedContainer(clusterResource, app_1, node_0, rmContainer,
|
||||||
|
ContainerStatus.newInstance(rmContainer.getContainerId(),
|
||||||
|
ContainerState.COMPLETE, "",
|
||||||
|
ContainerExitStatus.KILLED_BY_RESOURCEMANAGER),
|
||||||
|
RMContainerEventType.KILL, null, true);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testGetTotalPendingResourcesConsideringUserLimitTwoUsers()
|
||||||
|
throws Exception {
|
||||||
|
// Manipulate queue 'e'
|
||||||
|
LeafQueue e = stubLeafQueue((LeafQueue)queues.get(E));
|
||||||
|
// Allow queue 'e' to use 100% of cluster resources (max capacity).
|
||||||
|
e.setMaxCapacity(1.0f);
|
||||||
|
// When used queue resources goes above capacity (in this case, 1%), user
|
||||||
|
// resource limit (used in calculating headroom) is calculated in small
|
||||||
|
// increments to ensure that user-limit-percent can be met for all users in
|
||||||
|
// a queue. Take user-limit-percent out of the equation so that user
|
||||||
|
// resource limit will always be calculated to its max possible value.
|
||||||
|
e.setUserLimit(1000);
|
||||||
|
|
||||||
|
final String user_0 = "user_0";
|
||||||
|
final String user_1 = "user_1";
|
||||||
|
|
||||||
|
// Submit 2 applications for user_0
|
||||||
|
final ApplicationAttemptId appAttemptId_0 =
|
||||||
|
TestUtils.getMockApplicationAttemptId(0, 0);
|
||||||
|
FiCaSchedulerApp app_0 =
|
||||||
|
new FiCaSchedulerApp(appAttemptId_0, user_0, e,
|
||||||
|
mock(ActiveUsersManager.class), spyRMContext);
|
||||||
|
e.submitApplicationAttempt(app_0, user_0);
|
||||||
|
|
||||||
|
final ApplicationAttemptId appAttemptId_1 =
|
||||||
|
TestUtils.getMockApplicationAttemptId(1, 0);
|
||||||
|
FiCaSchedulerApp app_1 =
|
||||||
|
new FiCaSchedulerApp(appAttemptId_1, user_0, e,
|
||||||
|
mock(ActiveUsersManager.class), spyRMContext);
|
||||||
|
e.submitApplicationAttempt(app_1, user_0);
|
||||||
|
|
||||||
|
// Submit 2 applications for user_1
|
||||||
|
final ApplicationAttemptId appAttemptId_2 =
|
||||||
|
TestUtils.getMockApplicationAttemptId(2, 0);
|
||||||
|
FiCaSchedulerApp app_2 =
|
||||||
|
new FiCaSchedulerApp(appAttemptId_2, user_1, e,
|
||||||
|
mock(ActiveUsersManager.class), spyRMContext);
|
||||||
|
e.submitApplicationAttempt(app_2, user_1);
|
||||||
|
|
||||||
|
final ApplicationAttemptId appAttemptId_3 =
|
||||||
|
TestUtils.getMockApplicationAttemptId(3, 0);
|
||||||
|
FiCaSchedulerApp app_3 =
|
||||||
|
new FiCaSchedulerApp(appAttemptId_3, user_1, e,
|
||||||
|
mock(ActiveUsersManager.class), spyRMContext);
|
||||||
|
e.submitApplicationAttempt(app_3, user_1);
|
||||||
|
|
||||||
|
// Setup 1 node with 100GB of memory resources.
|
||||||
|
String host_0 = "127.0.0.1";
|
||||||
|
FiCaSchedulerNode node_0 = TestUtils.getMockNode(host_0, DEFAULT_RACK, 0,
|
||||||
|
100*GB);
|
||||||
|
|
||||||
|
final int numNodes = 1;
|
||||||
|
Resource clusterResource =
|
||||||
|
Resources.createResource(numNodes * (100*GB), numNodes * 128);
|
||||||
|
when(csContext.getNumClusterNodes()).thenReturn(numNodes);
|
||||||
|
|
||||||
|
// Pending resource requests for user_0: app_0 and app_1 total 3GB.
|
||||||
|
Priority priority = TestUtils.createMockPriority(1);
|
||||||
|
app_0.updateResourceRequests(Collections.singletonList(
|
||||||
|
TestUtils.createResourceRequest(ResourceRequest.ANY, 1*GB, 1, true,
|
||||||
|
priority, recordFactory)));
|
||||||
|
|
||||||
|
app_1.updateResourceRequests(Collections.singletonList(
|
||||||
|
TestUtils.createResourceRequest(ResourceRequest.ANY, 1*GB, 2, true,
|
||||||
|
priority, recordFactory)));
|
||||||
|
|
||||||
|
// Pending resource requests for user_1: app_2 and app_3 total 1GB.
|
||||||
|
priority = TestUtils.createMockPriority(1);
|
||||||
|
app_2.updateResourceRequests(Collections.singletonList(
|
||||||
|
TestUtils.createResourceRequest(ResourceRequest.ANY, 1*GB, 2, true,
|
||||||
|
priority, recordFactory)));
|
||||||
|
|
||||||
|
app_3.updateResourceRequests(Collections.singletonList(
|
||||||
|
TestUtils.createResourceRequest(ResourceRequest.ANY, 1*GB, 1, true,
|
||||||
|
priority, recordFactory)));
|
||||||
|
|
||||||
|
// Start testing...
|
||||||
|
// With queue capacity set at 1% of 100GB and user-limit-factor set to 1.0,
|
||||||
|
// queue 'e' should be able to consume 1GB per user.
|
||||||
|
assertEquals(2*GB, e.getTotalPendingResourcesConsideringUserLimit(
|
||||||
|
clusterResource, RMNodeLabelsManager.NO_LABEL).getMemory());
|
||||||
|
// None of the apps have assigned resources
|
||||||
|
// user_0's apps:
|
||||||
|
assertEquals(0*GB, app_0.getCurrentConsumption().getMemory());
|
||||||
|
assertEquals(0*GB, app_1.getCurrentConsumption().getMemory());
|
||||||
|
// user_1's apps:
|
||||||
|
assertEquals(0*GB, app_2.getCurrentConsumption().getMemory());
|
||||||
|
assertEquals(0*GB, app_3.getCurrentConsumption().getMemory());
|
||||||
|
|
||||||
|
// Assign 1st Container of 1GB
|
||||||
|
e.assignContainers(clusterResource, node_0,
|
||||||
|
new ResourceLimits(clusterResource),
|
||||||
|
SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY);
|
||||||
|
// The first container was assigned to user_0's app_0. Queues total headroom
|
||||||
|
// has 1GB left for user_1.
|
||||||
|
assertEquals(1*GB, e.getTotalPendingResourcesConsideringUserLimit(
|
||||||
|
clusterResource, RMNodeLabelsManager.NO_LABEL).getMemory());
|
||||||
|
// user_0's apps:
|
||||||
|
assertEquals(1*GB, app_0.getCurrentConsumption().getMemory());
|
||||||
|
assertEquals(0*GB, app_1.getCurrentConsumption().getMemory());
|
||||||
|
// user_1's apps:
|
||||||
|
assertEquals(0*GB, app_2.getCurrentConsumption().getMemory());
|
||||||
|
assertEquals(0*GB, app_3.getCurrentConsumption().getMemory());
|
||||||
|
|
||||||
|
// Assign 2nd container of 1GB
|
||||||
|
e.assignContainers(clusterResource, node_0,
|
||||||
|
new ResourceLimits(clusterResource),
|
||||||
|
SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY);
|
||||||
|
// user_0 has no headroom due to user-limit-factor of 1.0. However capacity
|
||||||
|
// scheduler will assign one container more than user-limit-factor. So,
|
||||||
|
// this container went to user_0's app_1. so, headroom for queue 'e'e is
|
||||||
|
// still 1GB for user_1
|
||||||
|
assertEquals(1*GB, e.getTotalPendingResourcesConsideringUserLimit(
|
||||||
|
clusterResource, RMNodeLabelsManager.NO_LABEL).getMemory());
|
||||||
|
// user_0's apps:
|
||||||
|
assertEquals(1*GB, app_0.getCurrentConsumption().getMemory());
|
||||||
|
assertEquals(1*GB, app_1.getCurrentConsumption().getMemory());
|
||||||
|
// user_1's apps:
|
||||||
|
assertEquals(0*GB, app_2.getCurrentConsumption().getMemory());
|
||||||
|
assertEquals(0*GB, app_3.getCurrentConsumption().getMemory());
|
||||||
|
|
||||||
|
// Assign 3rd container.
|
||||||
|
e.assignContainers(clusterResource, node_0,
|
||||||
|
new ResourceLimits(clusterResource),
|
||||||
|
SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY);
|
||||||
|
// Container was allocated to user_1's app_2 since user_1, Now, no headroom
|
||||||
|
// is left.
|
||||||
|
assertEquals(0*GB, e.getTotalPendingResourcesConsideringUserLimit(
|
||||||
|
clusterResource, RMNodeLabelsManager.NO_LABEL).getMemory());
|
||||||
|
// user_0's apps:
|
||||||
|
assertEquals(1*GB, app_0.getCurrentConsumption().getMemory());
|
||||||
|
assertEquals(1*GB, app_1.getCurrentConsumption().getMemory());
|
||||||
|
// user_1's apps:
|
||||||
|
assertEquals(1*GB, app_2.getCurrentConsumption().getMemory());
|
||||||
|
assertEquals(0*GB, app_3.getCurrentConsumption().getMemory());
|
||||||
|
|
||||||
|
// Assign 4th container.
|
||||||
|
e.assignContainers(clusterResource, node_0,
|
||||||
|
new ResourceLimits(clusterResource),
|
||||||
|
SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY);
|
||||||
|
// Allocated to user_1's app_2 since scheduler allocates 1 container
|
||||||
|
// above user resource limit. Available headroom still 0.
|
||||||
|
assertEquals(0*GB, e.getTotalPendingResourcesConsideringUserLimit(
|
||||||
|
clusterResource, RMNodeLabelsManager.NO_LABEL).getMemory());
|
||||||
|
// user_0's apps:
|
||||||
|
int app_0_consumption = app_0.getCurrentConsumption().getMemory();
|
||||||
|
assertEquals(1*GB, app_0_consumption);
|
||||||
|
int app_1_consumption = app_1.getCurrentConsumption().getMemory();
|
||||||
|
assertEquals(1*GB, app_1_consumption);
|
||||||
|
// user_1's apps:
|
||||||
|
int app_2_consumption = app_2.getCurrentConsumption().getMemory();
|
||||||
|
assertEquals(2*GB, app_2_consumption);
|
||||||
|
int app_3_consumption = app_3.getCurrentConsumption().getMemory();
|
||||||
|
assertEquals(0*GB, app_3_consumption);
|
||||||
|
|
||||||
|
// Attempt to assign 5th container. Will be a no-op.
|
||||||
|
e.assignContainers(clusterResource, node_0,
|
||||||
|
new ResourceLimits(clusterResource),
|
||||||
|
SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY);
|
||||||
|
// Cannot allocate 5th container because both users are above their allowed
|
||||||
|
// user resource limit. Values should be the same as previously.
|
||||||
|
assertEquals(0*GB, e.getTotalPendingResourcesConsideringUserLimit(
|
||||||
|
clusterResource, RMNodeLabelsManager.NO_LABEL).getMemory());
|
||||||
|
// user_0's apps:
|
||||||
|
assertEquals(app_0_consumption, app_0.getCurrentConsumption().getMemory());
|
||||||
|
assertEquals(app_1_consumption, app_1.getCurrentConsumption().getMemory());
|
||||||
|
// user_1's apps:
|
||||||
|
assertEquals(app_2_consumption, app_2.getCurrentConsumption().getMemory());
|
||||||
|
assertEquals(app_3_consumption, app_3.getCurrentConsumption().getMemory());
|
||||||
|
|
||||||
|
// Increase user-limit-factor from 1GB to 10GB (1% * 10 * 100GB = 10GB).
|
||||||
|
// Pending for both user_0 and user_1 are still 1GB each, so user-limit-
|
||||||
|
// factor is no longer the limiting factor.
|
||||||
|
e.setUserLimitFactor(10.0f);
|
||||||
|
|
||||||
|
e.assignContainers(clusterResource, node_0,
|
||||||
|
new ResourceLimits(clusterResource),
|
||||||
|
SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY);
|
||||||
|
// Next container goes to user_0's app_1, since it still wanted 1GB.
|
||||||
|
assertEquals(1*GB, e.getTotalPendingResourcesConsideringUserLimit(
|
||||||
|
clusterResource, RMNodeLabelsManager.NO_LABEL).getMemory());
|
||||||
|
// user_0's apps:
|
||||||
|
assertEquals(1*GB, app_0.getCurrentConsumption().getMemory());
|
||||||
|
assertEquals(2*GB, app_1.getCurrentConsumption().getMemory());
|
||||||
|
// user_1's apps:
|
||||||
|
assertEquals(2*GB, app_2.getCurrentConsumption().getMemory());
|
||||||
|
assertEquals(0*GB, app_3.getCurrentConsumption().getMemory());
|
||||||
|
|
||||||
|
e.assignContainers(clusterResource, node_0,
|
||||||
|
new ResourceLimits(clusterResource),
|
||||||
|
SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY);
|
||||||
|
// Last container goes to user_1's app_3, since it still wanted 1GB.
|
||||||
|
// user_0's apps:
|
||||||
|
assertEquals(0*GB, e.getTotalPendingResourcesConsideringUserLimit(
|
||||||
|
clusterResource, RMNodeLabelsManager.NO_LABEL).getMemory());
|
||||||
|
assertEquals(1*GB, app_0.getCurrentConsumption().getMemory());
|
||||||
|
assertEquals(2*GB, app_1.getCurrentConsumption().getMemory());
|
||||||
|
// user_1's apps:
|
||||||
|
assertEquals(2*GB, app_2.getCurrentConsumption().getMemory());
|
||||||
|
assertEquals(1*GB, app_3.getCurrentConsumption().getMemory());
|
||||||
|
|
||||||
|
// Release each container from app_0
|
||||||
|
for (RMContainer rmContainer : app_0.getLiveContainers()) {
|
||||||
|
e.completedContainer(clusterResource, app_0, node_0, rmContainer,
|
||||||
|
ContainerStatus.newInstance(rmContainer.getContainerId(),
|
||||||
|
ContainerState.COMPLETE, "",
|
||||||
|
ContainerExitStatus.KILLED_BY_RESOURCEMANAGER),
|
||||||
|
RMContainerEventType.KILL, null, true);
|
||||||
|
}
|
||||||
|
|
||||||
|
// Release each container from app_1
|
||||||
|
for (RMContainer rmContainer : app_1.getLiveContainers()) {
|
||||||
|
e.completedContainer(clusterResource, app_1, node_0, rmContainer,
|
||||||
|
ContainerStatus.newInstance(rmContainer.getContainerId(),
|
||||||
|
ContainerState.COMPLETE, "",
|
||||||
|
ContainerExitStatus.KILLED_BY_RESOURCEMANAGER),
|
||||||
|
RMContainerEventType.KILL, null, true);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
private List<FiCaSchedulerApp> createListOfApps(int noOfApps, String user,
|
private List<FiCaSchedulerApp> createListOfApps(int noOfApps, String user,
|
||||||
LeafQueue defaultQueue) {
|
LeafQueue defaultQueue) {
|
||||||
List<FiCaSchedulerApp> appsLists = new ArrayList<FiCaSchedulerApp>();
|
List<FiCaSchedulerApp> appsLists = new ArrayList<FiCaSchedulerApp>();
|
||||||
|
|
Loading…
Reference in New Issue