diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java index 7cd2c1c08e3..57ee69026f6 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java @@ -1521,64 +1521,12 @@ public class CapacityScheduler extends return null; } - CSAssignment assignment; - // Assign new containers... // 1. Check for reserved applications // 2. Schedule if there are no reservations RMContainer reservedContainer = node.getReservedContainer(); if (reservedContainer != null) { - FiCaSchedulerApp reservedApplication = getCurrentAttemptForContainer( - reservedContainer.getContainerId()); - if (reservedApplication == null) { - LOG.error( - "Trying to schedule for a finished app, please double check. nodeId=" - + node.getNodeID() + " container=" + reservedContainer - .getContainerId()); - return null; - } - - // Try to fulfill the reservation - LOG.debug("Trying to fulfill reservation for application {} on node: {}", - reservedApplication.getApplicationId(), node.getNodeID()); - - LeafQueue queue = ((LeafQueue) reservedApplication.getQueue()); - assignment = queue.assignContainers(getClusterResource(), candidates, - // TODO, now we only consider limits for parent for non-labeled - // resources, should consider labeled resources as well. - new ResourceLimits(labelManager - .getResourceByLabel(RMNodeLabelsManager.NO_LABEL, - getClusterResource())), - SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY); - - if (assignment.isFulfilledReservation()) { - if (withNodeHeartbeat) { - // Only update SchedulerHealth in sync scheduling, existing - // Data structure of SchedulerHealth need to be updated for - // Async mode - updateSchedulerHealth(lastNodeUpdateTime, node.getNodeID(), - assignment); - } - - schedulerHealth.updateSchedulerFulfilledReservationCounts(1); - - ActivitiesLogger.QUEUE.recordQueueActivity(activitiesManager, node, - queue.getParent().getQueueName(), queue.getQueueName(), - ActivityState.ACCEPTED, ActivityDiagnosticConstant.EMPTY); - ActivitiesLogger.NODE.finishAllocatedNodeAllocation(activitiesManager, - node, reservedContainer.getContainerId(), - AllocationState.ALLOCATED_FROM_RESERVED); - } else{ - ActivitiesLogger.QUEUE.recordQueueActivity(activitiesManager, node, - queue.getParent().getQueueName(), queue.getQueueName(), - ActivityState.ACCEPTED, ActivityDiagnosticConstant.EMPTY); - ActivitiesLogger.NODE.finishAllocatedNodeAllocation(activitiesManager, - node, reservedContainer.getContainerId(), AllocationState.SKIPPED); - } - - assignment.setSchedulingMode( - SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY); - submitResourceCommitRequest(getClusterResource(), assignment); + allocateFromReservedContainer(node, withNodeHeartbeat, reservedContainer); } // Do not schedule if there are any reservations to fulfill on the node @@ -1603,6 +1551,62 @@ public class CapacityScheduler extends return allocateOrReserveNewContainers(candidates, withNodeHeartbeat); } + private void allocateFromReservedContainer(FiCaSchedulerNode node, + boolean withNodeHeartbeat, RMContainer reservedContainer) { + FiCaSchedulerApp reservedApplication = getCurrentAttemptForContainer( + reservedContainer.getContainerId()); + if (reservedApplication == null) { + LOG.error( + "Trying to schedule for a finished app, please double check. nodeId=" + + node.getNodeID() + " container=" + reservedContainer + .getContainerId()); + return; + } + + // Try to fulfill the reservation + LOG.debug("Trying to fulfill reservation for application {} on node: {}", + reservedApplication.getApplicationId(), node.getNodeID()); + + LeafQueue queue = ((LeafQueue) reservedApplication.getQueue()); + CSAssignment assignment = queue.assignContainers(getClusterResource(), + new SimpleCandidateNodeSet<>(node), + // TODO, now we only consider limits for parent for non-labeled + // resources, should consider labeled resources as well. + new ResourceLimits(labelManager + .getResourceByLabel(RMNodeLabelsManager.NO_LABEL, + getClusterResource())), + SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY); + + if (assignment.isFulfilledReservation()) { + if (withNodeHeartbeat) { + // Only update SchedulerHealth in sync scheduling, existing + // Data structure of SchedulerHealth need to be updated for + // Async mode + updateSchedulerHealth(lastNodeUpdateTime, node.getNodeID(), + assignment); + } + + schedulerHealth.updateSchedulerFulfilledReservationCounts(1); + + ActivitiesLogger.QUEUE.recordQueueActivity(activitiesManager, node, + queue.getParent().getQueueName(), queue.getQueueName(), + ActivityState.ACCEPTED, ActivityDiagnosticConstant.EMPTY); + ActivitiesLogger.NODE.finishAllocatedNodeAllocation(activitiesManager, + node, reservedContainer.getContainerId(), + AllocationState.ALLOCATED_FROM_RESERVED); + } else{ + ActivitiesLogger.QUEUE.recordQueueActivity(activitiesManager, node, + queue.getParent().getQueueName(), queue.getQueueName(), + ActivityState.ACCEPTED, ActivityDiagnosticConstant.EMPTY); + ActivitiesLogger.NODE.finishAllocatedNodeAllocation(activitiesManager, + node, reservedContainer.getContainerId(), AllocationState.SKIPPED); + } + + assignment.setSchedulingMode( + SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY); + submitResourceCommitRequest(getClusterResource(), assignment); + } + private CSAssignment allocateOrReserveNewContainers( CandidateNodeSet candidates, boolean withNodeHeartbeat) { @@ -1674,7 +1678,14 @@ public class CapacityScheduler extends && preemptionManager.getKillableResource( CapacitySchedulerConfiguration.ROOT, candidates.getPartition()) == Resources.none()) { - LOG.debug("This node or this node partition doesn't have available or" + // Try to allocate from reserved containers + for (FiCaSchedulerNode node : candidates.getAllNodes().values()) { + RMContainer reservedContainer = node.getReservedContainer(); + if (reservedContainer != null) { + allocateFromReservedContainer(node, false, reservedContainer); + } + } + LOG.debug("This node or this node partition doesn't have available or " + "killable resource"); return null; } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacitySchedulerMultiNodes.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacitySchedulerMultiNodes.java index d4335ee9775..6c9faa6785d 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacitySchedulerMultiNodes.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacitySchedulerMultiNodes.java @@ -23,6 +23,9 @@ import java.util.Iterator; import java.util.List; import java.util.Set; +import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeUpdateSchedulerEvent; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.apache.hadoop.yarn.api.records.NodeId; @@ -163,4 +166,83 @@ public class TestCapacitySchedulerMultiNodes extends CapacitySchedulerTestBase { } rm.stop(); } + + @Test (timeout=30000) + public void testExcessReservationWillBeUnreserved() throws Exception { + CapacitySchedulerConfiguration newConf = + new CapacitySchedulerConfiguration(conf); + newConf.set(YarnConfiguration.RM_PLACEMENT_CONSTRAINTS_HANDLER, + YarnConfiguration.SCHEDULER_RM_PLACEMENT_CONSTRAINTS_HANDLER); + newConf.setInt(CapacitySchedulerConfiguration.MULTI_NODE_SORTING_POLICY_NAME + + ".resource-based.sorting-interval.ms", 0); + newConf.setMaximumApplicationMasterResourcePerQueuePercent("root.default", + 1.0f); + MockRM rm1 = new MockRM(newConf); + + rm1.start(); + MockNM nm1 = rm1.registerNode("h1:1234", 8 * GB); + MockNM nm2 = rm1.registerNode("h2:1234", 8 * GB); + + // launch an app to queue, AM container should be launched in nm1 + RMApp app1 = rm1.submitApp(5 * GB, "app", "user", null, "default"); + MockAM am1 = MockRM.launchAndRegisterAM(app1, rm1, nm1); + + // launch another app to queue, AM container should be launched in nm2 + RMApp app2 = rm1.submitApp(5 * GB, "app", "user", null, "default"); + MockAM am2 = MockRM.launchAndRegisterAM(app2, rm1, nm2); + + CapacityScheduler cs = (CapacityScheduler) rm1.getResourceScheduler(); + RMNode rmNode1 = rm1.getRMContext().getRMNodes().get(nm1.getNodeId()); + LeafQueue leafQueue = (LeafQueue) cs.getQueue("default"); + FiCaSchedulerApp schedulerApp1 = + cs.getApplicationAttempt(am1.getApplicationAttemptId()); + FiCaSchedulerApp schedulerApp2 = + cs.getApplicationAttempt(am2.getApplicationAttemptId()); + + /* + * Verify that reserved container will be unreserved + * after its ask has been cancelled when used capacity of root queue is 1. + */ + // Ask a container with 6GB memory size for app1, + // nm1 will reserve a container for app1 + am1.allocate("*", 6 * GB, 1, new ArrayList<>()); + cs.handle(new NodeUpdateSchedulerEvent(rmNode1)); + + // Check containers of app1 and app2. + Assert.assertNotNull(cs.getNode(nm1.getNodeId()).getReservedContainer()); + Assert.assertEquals(1, schedulerApp1.getLiveContainers().size()); + Assert.assertEquals(1, schedulerApp1.getReservedContainers().size()); + Assert.assertEquals(1, schedulerApp2.getLiveContainers().size()); + + // Cancel ask of the reserved container. + am1.allocate("*", 6 * GB, 0, new ArrayList<>()); + // Ask another container with 2GB memory size for app2. + am2.allocate("*", 2 * GB, 1, new ArrayList<>()); + + // Trigger scheduling to release reserved container + // whose ask has been cancelled. + cs.handle(new NodeUpdateSchedulerEvent(rmNode1)); + Assert.assertNull(cs.getNode(nm1.getNodeId()).getReservedContainer()); + Assert.assertEquals(1, schedulerApp1.getLiveContainers().size()); + Assert.assertEquals(0, schedulerApp1.getReservedContainers().size()); + Assert.assertEquals(1, schedulerApp2.getLiveContainers().size()); + + // Trigger scheduling to allocate a container on nm1 for app2. + cs.handle(new NodeUpdateSchedulerEvent(rmNode1)); + Assert.assertNull(cs.getNode(nm1.getNodeId()).getReservedContainer()); + Assert.assertEquals(1, schedulerApp1.getLiveContainers().size()); + Assert.assertEquals(0, schedulerApp1.getReservedContainers().size()); + Assert.assertEquals(2, schedulerApp2.getLiveContainers().size()); + Assert.assertEquals(7 * GB, + cs.getNode(nm1.getNodeId()).getAllocatedResource().getMemorySize()); + Assert.assertEquals(12 * GB, + cs.getRootQueue().getQueueResourceUsage().getUsed().getMemorySize()); + Assert.assertEquals(0, + cs.getRootQueue().getQueueResourceUsage().getReserved() + .getMemorySize()); + Assert.assertEquals(0, + leafQueue.getQueueResourceUsage().getReserved().getMemorySize()); + + rm1.close(); + } }