YARN-8546. Resource leak caused by a reserved container being released more than once under async scheduling. Contributed by Tao Yang.

(cherry picked from commit 5be9f4a5d0)
This commit is contained in:
Weiwei Yang 2018-07-25 17:35:27 +08:00 committed by Eric Payne
parent dc03afc7df
commit bdd396b26d
2 changed files with 111 additions and 0 deletions

View File

@ -366,6 +366,21 @@ public class FiCaSchedulerApp extends SchedulerApplicationAttempt {
.isEmpty()) { .isEmpty()) {
for (SchedulerContainer<FiCaSchedulerApp, FiCaSchedulerNode> for (SchedulerContainer<FiCaSchedulerApp, FiCaSchedulerNode>
releaseContainer : allocation.getToRelease()) { releaseContainer : allocation.getToRelease()) {
// Make sure to-release reserved containers are not outdated
if (releaseContainer.getRmContainer().getState()
== RMContainerState.RESERVED
&& releaseContainer.getRmContainer() != releaseContainer
.getSchedulerNode().getReservedContainer()) {
if (LOG.isDebugEnabled()) {
LOG.debug("Failed to accept this proposal because "
+ "it tries to release an outdated reserved container "
+ releaseContainer.getRmContainer().getContainerId()
+ " on node " + releaseContainer.getSchedulerNode().getNodeID()
+ " whose reserved container is "
+ releaseContainer.getSchedulerNode().getReservedContainer());
}
return false;
}
// Only consider non-reserved container (reserved container will // Only consider non-reserved container (reserved container will
// not affect available resource of node) on the same node // not affect available resource of node) on the same node
if (releaseContainer.getRmContainer().getState() if (releaseContainer.getRmContainer().getState()

View File

@ -33,6 +33,7 @@ import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.server.resourcemanager.MockAM; import org.apache.hadoop.yarn.server.resourcemanager.MockAM;
import org.apache.hadoop.yarn.server.resourcemanager.MockNM; import org.apache.hadoop.yarn.server.resourcemanager.MockNM;
import org.apache.hadoop.yarn.server.resourcemanager.MockRM; import org.apache.hadoop.yarn.server.resourcemanager.MockRM;
import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.NullRMNodeLabelsManager; import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.NullRMNodeLabelsManager;
import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager; import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
@ -669,6 +670,101 @@ public class TestCapacitySchedulerAsyncScheduling {
rm.stop(); rm.stop();
} }
@Test(timeout = 60000)
public void testReleaseOutdatedReservedContainer() throws Exception {
/*
* Submit a application, reserved container_02 on nm1,
* submit two allocate proposals which contain the same reserved
* container_02 as to-released container.
* First proposal should be accepted, second proposal should be rejected
* because it tries to release an outdated reserved container
*/
MockRM rm1 = new MockRM();
rm1.getRMContext().setNodeLabelManager(mgr);
rm1.start();
MockNM nm1 = rm1.registerNode("h1:1234", 8 * GB);
MockNM nm2 = rm1.registerNode("h2:1234", 8 * GB);
MockNM nm3 = rm1.registerNode("h3:1234", 8 * GB);
rm1.drainEvents();
CapacityScheduler cs = (CapacityScheduler) rm1.getResourceScheduler();
RMNode rmNode1 = rm1.getRMContext().getRMNodes().get(nm1.getNodeId());
LeafQueue defaultQueue = (LeafQueue) cs.getQueue("default");
SchedulerNode sn1 = cs.getSchedulerNode(nm1.getNodeId());
SchedulerNode sn2 = cs.getSchedulerNode(nm2.getNodeId());
SchedulerNode sn3 = cs.getSchedulerNode(nm3.getNodeId());
// launch another app to queue, AM container should be launched in nm1
RMApp app1 = rm1.submitApp(4 * GB, "app", "user", null, "default");
MockAM am1 = MockRM.launchAndRegisterAM(app1, rm1, nm1);
Resource allocateResource = Resources.createResource(5 * GB);
am1.allocate("*", (int) allocateResource.getMemorySize(), 3, 0,
new ArrayList<ContainerId>(), "");
FiCaSchedulerApp schedulerApp1 =
cs.getApplicationAttempt(am1.getApplicationAttemptId());
cs.handle(new NodeUpdateSchedulerEvent(rmNode1));
Assert.assertEquals(1, schedulerApp1.getReservedContainers().size());
Assert.assertEquals(9 * GB,
defaultQueue.getQueueResourceUsage().getUsed().getMemorySize());
RMContainer reservedContainer =
schedulerApp1.getReservedContainers().get(0);
ResourceCommitRequest allocateFromSameReservedContainerProposal1 =
createAllocateFromReservedProposal(3, allocateResource, schedulerApp1,
sn2, sn1, cs.getRMContext(), reservedContainer);
Assert.assertNotNull("Container should be reserved",
sn1.getReservedContainer());;
Assert.assertEquals("No memory should be used on " + sn2.getNodeName(),
0, sn2.getAllocatedResource().getMemorySize());
cs.tryCommit(cs.getClusterResource(),
allocateFromSameReservedContainerProposal1);
Assert.assertNull("Container should have been unreserved",
sn1.getReservedContainer());;
Assert.assertEquals("Memory should be used on " + sn2.getNodeName(),
allocateResource.getMemorySize(),
sn2.getAllocatedResource().getMemorySize());
ResourceCommitRequest allocateFromSameReservedContainerProposal2 =
createAllocateFromReservedProposal(4, allocateResource, schedulerApp1,
sn3, sn1, cs.getRMContext(), reservedContainer);
cs.tryCommit(cs.getClusterResource(),
allocateFromSameReservedContainerProposal2);
Assert.assertFalse("This proposal should be rejected because "
+ "it tries to release an outdated reserved container",
sn3.getAllocatedResource().getMemorySize() != 0);
rm1.close();
}
private ResourceCommitRequest createAllocateFromReservedProposal(
int containerId, Resource allocateResource, FiCaSchedulerApp schedulerApp,
SchedulerNode allocateNode, SchedulerNode reservedNode,
RMContext rmContext, RMContainer reservedContainer) {
Container container = Container.newInstance(
ContainerId.newContainerId(schedulerApp.getApplicationAttemptId(), containerId),
allocateNode.getNodeID(), allocateNode.getHttpAddress(), allocateResource,
Priority.newInstance(0), null);
RMContainer rmContainer = new RMContainerImpl(container, SchedulerRequestKey
.create(ResourceRequest
.newInstance(Priority.newInstance(0), "*", allocateResource, 1)),
schedulerApp.getApplicationAttemptId(), allocateNode.getNodeID(), "user",
rmContext);
SchedulerContainer allocateContainer =
new SchedulerContainer(schedulerApp, allocateNode, rmContainer, "", true);
SchedulerContainer reservedSchedulerContainer =
new SchedulerContainer(schedulerApp, reservedNode, reservedContainer, "",
false);
List<SchedulerContainer> toRelease = new ArrayList<>();
toRelease.add(reservedSchedulerContainer);
ContainerAllocationProposal allocateFromReservedProposal =
new ContainerAllocationProposal(allocateContainer, toRelease, null,
NodeType.OFF_SWITCH, NodeType.OFF_SWITCH,
SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY, allocateResource);
List<ContainerAllocationProposal> allocateProposals = new ArrayList<>();
allocateProposals.add(allocateFromReservedProposal);
return new ResourceCommitRequest(allocateProposals, null, null);
}
private void allocateAndLaunchContainers(MockAM am, MockNM nm, MockRM rm, private void allocateAndLaunchContainers(MockAM am, MockNM nm, MockRM rm,
int nContainer, Resource resource, int priority, int startContainerId) int nContainer, Resource resource, int priority, int startContainerId)
throws Exception { throws Exception {