YARN-4610. Reservations continue looking for one app causes other apps to starve. Contributed by Jason Lowe

This commit is contained in:
Jason Lowe 2016-01-21 18:35:24 +00:00
parent bade7f06e8
commit 63bb912749
3 changed files with 149 additions and 2 deletions

View File

@ -69,6 +69,9 @@ Release 2.7.3 - UNRELEASED
YARN-4581. AHS writer thread leak makes RM crash while RM is recovering.
(sandflee via junping_du)
YARN-4610. Reservations continue looking for one app causes other apps to
starve (jlowe)
Release 2.7.2 - UNRELEASED
INCOMPATIBLE CHANGES

View File

@ -769,6 +769,9 @@ public class LeafQueue extends AbstractCSQueue {
}
}
Resource initAmountNeededUnreserve =
currentResourceLimits.getAmountNeededUnreserve();
// Try to assign containers to applications in order
for (FiCaSchedulerApp application : activeApplications) {
@ -821,6 +824,9 @@ public class LeafQueue extends AbstractCSQueue {
computeUserLimitAndSetHeadroom(application, clusterResource,
required, requestedNodeLabels);
currentResourceLimits.setAmountNeededUnreserve(
initAmountNeededUnreserve);
// Check queue max-capacity limit
if (!super.canAssignToThisQueue(clusterResource, node.getLabels(),
currentResourceLimits, required, application.getCurrentReservation())) {

View File

@ -102,12 +102,17 @@ public class TestReservations {
}
private void setup(CapacitySchedulerConfiguration csConf) throws Exception {
setup(csConf, false);
}
private void setup(CapacitySchedulerConfiguration csConf,
boolean addUserLimits) throws Exception {
csConf.setBoolean("yarn.scheduler.capacity.user-metrics.enable", true);
final String newRoot = "root" + System.currentTimeMillis();
// final String newRoot = "root";
setupQueueConfiguration(csConf, newRoot);
setupQueueConfiguration(csConf, newRoot, addUserLimits);
YarnConfiguration conf = new YarnConfiguration();
cs.setConf(conf);
@ -148,7 +153,7 @@ public class TestReservations {
private static final String A = "a";
private void setupQueueConfiguration(CapacitySchedulerConfiguration conf,
final String newRoot) {
final String newRoot, boolean addUserLimits) {
// Define top-level queues
conf.setQueues(CapacitySchedulerConfiguration.ROOT,
@ -169,6 +174,10 @@ public class TestReservations {
conf.setMaximumCapacity(Q_A, 100);
conf.setAcl(Q_A, QueueACL.SUBMIT_APPLICATIONS, "*");
if (addUserLimits) {
conf.setUserLimit(Q_A, 25);
conf.setUserLimitFactor(Q_A, 0.25f);
}
}
static LeafQueue stubLeafQueue(LeafQueue queue) {
@ -354,6 +363,135 @@ public class TestReservations {
assertEquals(0, app_0.getTotalRequiredResources(priorityReduce));
}
// Test that hitting a reservation limit and needing to unreserve
// does not affect assigning containers for other users
@Test
public void testReservationLimitOtherUsers() throws Exception {
CapacitySchedulerConfiguration csConf = new CapacitySchedulerConfiguration();
setup(csConf, true);
// Manipulate queue 'a'
LeafQueue a = stubLeafQueue((LeafQueue) queues.get(A));
// Users
final String user_0 = "user_0";
final String user_1 = "user_1";
// Submit applications
final ApplicationAttemptId appAttemptId_0 = TestUtils
.getMockApplicationAttemptId(0, 0);
FiCaSchedulerApp app_0 = new FiCaSchedulerApp(appAttemptId_0, user_0, a,
mock(ActiveUsersManager.class), spyRMContext);
app_0 = spy(app_0);
rmContext.getRMApps().put(app_0.getApplicationId(), mock(RMApp.class));
a.submitApplicationAttempt(app_0, user_0);
final ApplicationAttemptId appAttemptId_1 = TestUtils
.getMockApplicationAttemptId(1, 0);
FiCaSchedulerApp app_1 = new FiCaSchedulerApp(appAttemptId_1, user_1, a,
mock(ActiveUsersManager.class), spyRMContext);
app_1 = spy(app_1);
rmContext.getRMApps().put(app_1.getApplicationId(), mock(RMApp.class));
a.submitApplicationAttempt(app_1, user_1);
// Setup some nodes
String host_0 = "host_0";
FiCaSchedulerNode node_0 = TestUtils.getMockNode(host_0, DEFAULT_RACK, 0,
8 * GB);
String host_1 = "host_1";
FiCaSchedulerNode node_1 = TestUtils.getMockNode(host_1, DEFAULT_RACK, 0,
8 * GB);
String host_2 = "host_2";
FiCaSchedulerNode node_2 = TestUtils.getMockNode(host_2, DEFAULT_RACK, 0,
8 * GB);
when(csContext.getNode(node_0.getNodeID())).thenReturn(node_0);
when(csContext.getNode(node_1.getNodeID())).thenReturn(node_1);
when(csContext.getNode(node_2.getNodeID())).thenReturn(node_2);
cs.getAllNodes().put(node_0.getNodeID(), node_0);
cs.getAllNodes().put(node_1.getNodeID(), node_1);
cs.getAllNodes().put(node_2.getNodeID(), node_2);
final int numNodes = 3;
Resource clusterResource = Resources.createResource(numNodes * (8 * GB));
when(csContext.getNumClusterNodes()).thenReturn(numNodes);
// Setup resource-requests
Priority priorityAM = TestUtils.createMockPriority(1);
Priority priorityMap = TestUtils.createMockPriority(5);
app_0.updateResourceRequests(Collections.singletonList(TestUtils
.createResourceRequest(ResourceRequest.ANY, 2 * GB, 1, true,
priorityAM, recordFactory)));
app_1.updateResourceRequests(Collections.singletonList(TestUtils
.createResourceRequest(ResourceRequest.ANY, 2 * GB, 1, true,
priorityAM, recordFactory)));
// Start testing...
// Only AM
a.assignContainers(clusterResource, node_0,
new ResourceLimits(clusterResource));
assertEquals(2 * GB, a.getUsedResources().getMemory());
assertEquals(2 * GB, app_0.getCurrentConsumption().getMemory());
assertEquals(0 * GB, app_1.getCurrentConsumption().getMemory());
assertEquals(0 * GB, a.getMetrics().getReservedMB());
assertEquals(2 * GB, a.getMetrics().getAllocatedMB());
assertEquals(22 * GB, a.getMetrics().getAvailableMB());
assertEquals(2 * GB, node_0.getUsedResource().getMemory());
assertEquals(0 * GB, node_1.getUsedResource().getMemory());
assertEquals(0 * GB, node_2.getUsedResource().getMemory());
a.assignContainers(clusterResource, node_1,
new ResourceLimits(clusterResource));
assertEquals(4 * GB, a.getUsedResources().getMemory());
assertEquals(2 * GB, app_0.getCurrentConsumption().getMemory());
assertEquals(2 * GB, app_1.getCurrentConsumption().getMemory());
assertEquals(0 * GB, a.getMetrics().getReservedMB());
assertEquals(4 * GB, a.getMetrics().getAllocatedMB());
assertEquals(20 * GB, a.getMetrics().getAvailableMB());
assertEquals(2 * GB, node_0.getUsedResource().getMemory());
assertEquals(2 * GB, node_1.getUsedResource().getMemory());
assertEquals(0 * GB, node_2.getUsedResource().getMemory());
// Add a few requests to each app
app_0.updateResourceRequests(Collections.singletonList(TestUtils
.createResourceRequest(ResourceRequest.ANY, 8 * GB, 2, true,
priorityMap, recordFactory)));
app_1.updateResourceRequests(Collections.singletonList(TestUtils
.createResourceRequest(ResourceRequest.ANY, 2 * GB, 2, true,
priorityMap, recordFactory)));
// add a reservation for app_0
a.assignContainers(clusterResource, node_0,
new ResourceLimits(clusterResource));
assertEquals(12 * GB, a.getUsedResources().getMemory());
assertEquals(2 * GB, app_0.getCurrentConsumption().getMemory());
assertEquals(2 * GB, app_1.getCurrentConsumption().getMemory());
assertEquals(8 * GB, a.getMetrics().getReservedMB());
assertEquals(4 * GB, a.getMetrics().getAllocatedMB());
assertEquals(12 * GB, a.getMetrics().getAvailableMB());
assertEquals(2 * GB, node_0.getUsedResource().getMemory());
assertEquals(2 * GB, node_1.getUsedResource().getMemory());
assertEquals(0 * GB, node_2.getUsedResource().getMemory());
// next assignment is beyond user limit for user_0 but it should assign to
// app_1 for user_1
a.assignContainers(clusterResource, node_1,
new ResourceLimits(clusterResource));
assertEquals(14 * GB, a.getUsedResources().getMemory());
assertEquals(2 * GB, app_0.getCurrentConsumption().getMemory());
assertEquals(4 * GB, app_1.getCurrentConsumption().getMemory());
assertEquals(8 * GB, a.getMetrics().getReservedMB());
assertEquals(6 * GB, a.getMetrics().getAllocatedMB());
assertEquals(10 * GB, a.getMetrics().getAvailableMB());
assertEquals(2 * GB, node_0.getUsedResource().getMemory());
assertEquals(4 * GB, node_1.getUsedResource().getMemory());
assertEquals(0 * GB, node_2.getUsedResource().getMemory());
}
@Test
public void testReservationNoContinueLook() throws Exception {
// Test that with reservations-continue-look-all-nodes feature off