YARN-9432. Reserved containers leak after its request has been cancelled or satisfied when multi-nodes enabled. Contributed by Tao Yang.

This commit is contained in:
Weiwei Yang 2019-05-08 09:48:15 +08:00
parent 66c2a4ef89
commit c336af3847
2 changed files with 147 additions and 54 deletions

View File

@ -1521,64 +1521,12 @@ public class CapacityScheduler extends
return null;
}
CSAssignment assignment;
// Assign new containers...
// 1. Check for reserved applications
// 2. Schedule if there are no reservations
RMContainer reservedContainer = node.getReservedContainer();
if (reservedContainer != null) {
FiCaSchedulerApp reservedApplication = getCurrentAttemptForContainer(
reservedContainer.getContainerId());
if (reservedApplication == null) {
LOG.error(
"Trying to schedule for a finished app, please double check. nodeId="
+ node.getNodeID() + " container=" + reservedContainer
.getContainerId());
return null;
}
// Try to fulfill the reservation
LOG.debug("Trying to fulfill reservation for application {} on node: {}",
reservedApplication.getApplicationId(), node.getNodeID());
LeafQueue queue = ((LeafQueue) reservedApplication.getQueue());
assignment = queue.assignContainers(getClusterResource(), candidates,
// TODO, now we only consider limits for parent for non-labeled
// resources, should consider labeled resources as well.
new ResourceLimits(labelManager
.getResourceByLabel(RMNodeLabelsManager.NO_LABEL,
getClusterResource())),
SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY);
if (assignment.isFulfilledReservation()) {
if (withNodeHeartbeat) {
// Only update SchedulerHealth in sync scheduling, existing
// Data structure of SchedulerHealth need to be updated for
// Async mode
updateSchedulerHealth(lastNodeUpdateTime, node.getNodeID(),
assignment);
}
schedulerHealth.updateSchedulerFulfilledReservationCounts(1);
ActivitiesLogger.QUEUE.recordQueueActivity(activitiesManager, node,
queue.getParent().getQueueName(), queue.getQueueName(),
ActivityState.ACCEPTED, ActivityDiagnosticConstant.EMPTY);
ActivitiesLogger.NODE.finishAllocatedNodeAllocation(activitiesManager,
node, reservedContainer.getContainerId(),
AllocationState.ALLOCATED_FROM_RESERVED);
} else{
ActivitiesLogger.QUEUE.recordQueueActivity(activitiesManager, node,
queue.getParent().getQueueName(), queue.getQueueName(),
ActivityState.ACCEPTED, ActivityDiagnosticConstant.EMPTY);
ActivitiesLogger.NODE.finishAllocatedNodeAllocation(activitiesManager,
node, reservedContainer.getContainerId(), AllocationState.SKIPPED);
}
assignment.setSchedulingMode(
SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY);
submitResourceCommitRequest(getClusterResource(), assignment);
allocateFromReservedContainer(node, withNodeHeartbeat, reservedContainer);
}
// Do not schedule if there are any reservations to fulfill on the node
@ -1603,6 +1551,62 @@ public class CapacityScheduler extends
return allocateOrReserveNewContainers(candidates, withNodeHeartbeat);
}
private void allocateFromReservedContainer(FiCaSchedulerNode node,
boolean withNodeHeartbeat, RMContainer reservedContainer) {
FiCaSchedulerApp reservedApplication = getCurrentAttemptForContainer(
reservedContainer.getContainerId());
if (reservedApplication == null) {
LOG.error(
"Trying to schedule for a finished app, please double check. nodeId="
+ node.getNodeID() + " container=" + reservedContainer
.getContainerId());
return;
}
// Try to fulfill the reservation
LOG.debug("Trying to fulfill reservation for application {} on node: {}",
reservedApplication.getApplicationId(), node.getNodeID());
LeafQueue queue = ((LeafQueue) reservedApplication.getQueue());
CSAssignment assignment = queue.assignContainers(getClusterResource(),
new SimpleCandidateNodeSet<>(node),
// TODO, now we only consider limits for parent for non-labeled
// resources, should consider labeled resources as well.
new ResourceLimits(labelManager
.getResourceByLabel(RMNodeLabelsManager.NO_LABEL,
getClusterResource())),
SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY);
if (assignment.isFulfilledReservation()) {
if (withNodeHeartbeat) {
// Only update SchedulerHealth in sync scheduling, existing
// Data structure of SchedulerHealth need to be updated for
// Async mode
updateSchedulerHealth(lastNodeUpdateTime, node.getNodeID(),
assignment);
}
schedulerHealth.updateSchedulerFulfilledReservationCounts(1);
ActivitiesLogger.QUEUE.recordQueueActivity(activitiesManager, node,
queue.getParent().getQueueName(), queue.getQueueName(),
ActivityState.ACCEPTED, ActivityDiagnosticConstant.EMPTY);
ActivitiesLogger.NODE.finishAllocatedNodeAllocation(activitiesManager,
node, reservedContainer.getContainerId(),
AllocationState.ALLOCATED_FROM_RESERVED);
} else{
ActivitiesLogger.QUEUE.recordQueueActivity(activitiesManager, node,
queue.getParent().getQueueName(), queue.getQueueName(),
ActivityState.ACCEPTED, ActivityDiagnosticConstant.EMPTY);
ActivitiesLogger.NODE.finishAllocatedNodeAllocation(activitiesManager,
node, reservedContainer.getContainerId(), AllocationState.SKIPPED);
}
assignment.setSchedulingMode(
SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY);
submitResourceCommitRequest(getClusterResource(), assignment);
}
private CSAssignment allocateOrReserveNewContainers(
CandidateNodeSet<FiCaSchedulerNode> candidates,
boolean withNodeHeartbeat) {
@ -1674,7 +1678,14 @@ public class CapacityScheduler extends
&& preemptionManager.getKillableResource(
CapacitySchedulerConfiguration.ROOT, candidates.getPartition())
== Resources.none()) {
LOG.debug("This node or this node partition doesn't have available or"
// Try to allocate from reserved containers
for (FiCaSchedulerNode node : candidates.getAllNodes().values()) {
RMContainer reservedContainer = node.getReservedContainer();
if (reservedContainer != null) {
allocateFromReservedContainer(node, false, reservedContainer);
}
}
LOG.debug("This node or this node partition doesn't have available or "
+ "killable resource");
return null;
}

View File

@ -23,6 +23,9 @@ import java.util.Iterator;
import java.util.List;
import java.util.Set;
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeUpdateSchedulerEvent;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.hadoop.yarn.api.records.NodeId;
@ -163,4 +166,83 @@ public class TestCapacitySchedulerMultiNodes extends CapacitySchedulerTestBase {
}
rm.stop();
}
@Test (timeout=30000)
public void testExcessReservationWillBeUnreserved() throws Exception {
CapacitySchedulerConfiguration newConf =
new CapacitySchedulerConfiguration(conf);
newConf.set(YarnConfiguration.RM_PLACEMENT_CONSTRAINTS_HANDLER,
YarnConfiguration.SCHEDULER_RM_PLACEMENT_CONSTRAINTS_HANDLER);
newConf.setInt(CapacitySchedulerConfiguration.MULTI_NODE_SORTING_POLICY_NAME
+ ".resource-based.sorting-interval.ms", 0);
newConf.setMaximumApplicationMasterResourcePerQueuePercent("root.default",
1.0f);
MockRM rm1 = new MockRM(newConf);
rm1.start();
MockNM nm1 = rm1.registerNode("h1:1234", 8 * GB);
MockNM nm2 = rm1.registerNode("h2:1234", 8 * GB);
// launch an app to queue, AM container should be launched in nm1
RMApp app1 = rm1.submitApp(5 * GB, "app", "user", null, "default");
MockAM am1 = MockRM.launchAndRegisterAM(app1, rm1, nm1);
// launch another app to queue, AM container should be launched in nm2
RMApp app2 = rm1.submitApp(5 * GB, "app", "user", null, "default");
MockAM am2 = MockRM.launchAndRegisterAM(app2, rm1, nm2);
CapacityScheduler cs = (CapacityScheduler) rm1.getResourceScheduler();
RMNode rmNode1 = rm1.getRMContext().getRMNodes().get(nm1.getNodeId());
LeafQueue leafQueue = (LeafQueue) cs.getQueue("default");
FiCaSchedulerApp schedulerApp1 =
cs.getApplicationAttempt(am1.getApplicationAttemptId());
FiCaSchedulerApp schedulerApp2 =
cs.getApplicationAttempt(am2.getApplicationAttemptId());
/*
* Verify that reserved container will be unreserved
* after its ask has been cancelled when used capacity of root queue is 1.
*/
// Ask a container with 6GB memory size for app1,
// nm1 will reserve a container for app1
am1.allocate("*", 6 * GB, 1, new ArrayList<>());
cs.handle(new NodeUpdateSchedulerEvent(rmNode1));
// Check containers of app1 and app2.
Assert.assertNotNull(cs.getNode(nm1.getNodeId()).getReservedContainer());
Assert.assertEquals(1, schedulerApp1.getLiveContainers().size());
Assert.assertEquals(1, schedulerApp1.getReservedContainers().size());
Assert.assertEquals(1, schedulerApp2.getLiveContainers().size());
// Cancel ask of the reserved container.
am1.allocate("*", 6 * GB, 0, new ArrayList<>());
// Ask another container with 2GB memory size for app2.
am2.allocate("*", 2 * GB, 1, new ArrayList<>());
// Trigger scheduling to release reserved container
// whose ask has been cancelled.
cs.handle(new NodeUpdateSchedulerEvent(rmNode1));
Assert.assertNull(cs.getNode(nm1.getNodeId()).getReservedContainer());
Assert.assertEquals(1, schedulerApp1.getLiveContainers().size());
Assert.assertEquals(0, schedulerApp1.getReservedContainers().size());
Assert.assertEquals(1, schedulerApp2.getLiveContainers().size());
// Trigger scheduling to allocate a container on nm1 for app2.
cs.handle(new NodeUpdateSchedulerEvent(rmNode1));
Assert.assertNull(cs.getNode(nm1.getNodeId()).getReservedContainer());
Assert.assertEquals(1, schedulerApp1.getLiveContainers().size());
Assert.assertEquals(0, schedulerApp1.getReservedContainers().size());
Assert.assertEquals(2, schedulerApp2.getLiveContainers().size());
Assert.assertEquals(7 * GB,
cs.getNode(nm1.getNodeId()).getAllocatedResource().getMemorySize());
Assert.assertEquals(12 * GB,
cs.getRootQueue().getQueueResourceUsage().getUsed().getMemorySize());
Assert.assertEquals(0,
cs.getRootQueue().getQueueResourceUsage().getReserved()
.getMemorySize());
Assert.assertEquals(0,
leafQueue.getQueueResourceUsage().getReserved().getMemorySize());
rm1.close();
}
}