YARN-8546. Resource leak caused by a reserved container being released more than once under async scheduling. Contributed by Tao Yang.
This commit is contained in:
parent
955f795101
commit
5be9f4a5d0
|
@ -361,6 +361,21 @@ public class FiCaSchedulerApp extends SchedulerApplicationAttempt {
|
||||||
.isEmpty()) {
|
.isEmpty()) {
|
||||||
for (SchedulerContainer<FiCaSchedulerApp, FiCaSchedulerNode>
|
for (SchedulerContainer<FiCaSchedulerApp, FiCaSchedulerNode>
|
||||||
releaseContainer : allocation.getToRelease()) {
|
releaseContainer : allocation.getToRelease()) {
|
||||||
|
// Make sure to-release reserved containers are not outdated
|
||||||
|
if (releaseContainer.getRmContainer().getState()
|
||||||
|
== RMContainerState.RESERVED
|
||||||
|
&& releaseContainer.getRmContainer() != releaseContainer
|
||||||
|
.getSchedulerNode().getReservedContainer()) {
|
||||||
|
if (LOG.isDebugEnabled()) {
|
||||||
|
LOG.debug("Failed to accept this proposal because "
|
||||||
|
+ "it tries to release an outdated reserved container "
|
||||||
|
+ releaseContainer.getRmContainer().getContainerId()
|
||||||
|
+ " on node " + releaseContainer.getSchedulerNode().getNodeID()
|
||||||
|
+ " whose reserved container is "
|
||||||
|
+ releaseContainer.getSchedulerNode().getReservedContainer());
|
||||||
|
}
|
||||||
|
return false;
|
||||||
|
}
|
||||||
// Only consider non-reserved container (reserved container will
|
// Only consider non-reserved container (reserved container will
|
||||||
// not affect available resource of node) on the same node
|
// not affect available resource of node) on the same node
|
||||||
if (releaseContainer.getRmContainer().getState()
|
if (releaseContainer.getRmContainer().getState()
|
||||||
|
|
|
@ -31,6 +31,7 @@ import org.apache.hadoop.yarn.conf.YarnConfiguration;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.MockAM;
|
import org.apache.hadoop.yarn.server.resourcemanager.MockAM;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.MockNM;
|
import org.apache.hadoop.yarn.server.resourcemanager.MockNM;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.MockRM;
|
import org.apache.hadoop.yarn.server.resourcemanager.MockRM;
|
||||||
|
import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.NullRMNodeLabelsManager;
|
import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.NullRMNodeLabelsManager;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager;
|
import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
|
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
|
||||||
|
@ -41,6 +42,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerEven
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerEventType;
|
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerEventType;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerImpl;
|
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerImpl;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerState;
|
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerState;
|
||||||
|
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.AbstractYarnScheduler;
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.AbstractYarnScheduler;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.NodeType;
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.NodeType;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler;
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler;
|
||||||
|
@ -685,6 +687,93 @@ public class TestCapacitySchedulerAsyncScheduling {
|
||||||
rm.stop();
|
rm.stop();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
@Test(timeout = 60000)
|
||||||
|
public void testReleaseOutdatedReservedContainer() throws Exception {
|
||||||
|
/*
|
||||||
|
* Submit a application, reserved container_02 on nm1,
|
||||||
|
* submit two allocate proposals which contain the same reserved
|
||||||
|
* container_02 as to-released container.
|
||||||
|
* First proposal should be accepted, second proposal should be rejected
|
||||||
|
* because it try to release an outdated reserved container
|
||||||
|
*/
|
||||||
|
MockRM rm1 = new MockRM();
|
||||||
|
rm1.getRMContext().setNodeLabelManager(mgr);
|
||||||
|
rm1.start();
|
||||||
|
MockNM nm1 = rm1.registerNode("h1:1234", 8 * GB);
|
||||||
|
MockNM nm2 = rm1.registerNode("h2:1234", 8 * GB);
|
||||||
|
MockNM nm3 = rm1.registerNode("h3:1234", 8 * GB);
|
||||||
|
rm1.drainEvents();
|
||||||
|
|
||||||
|
CapacityScheduler cs = (CapacityScheduler) rm1.getResourceScheduler();
|
||||||
|
RMNode rmNode1 = rm1.getRMContext().getRMNodes().get(nm1.getNodeId());
|
||||||
|
LeafQueue defaultQueue = (LeafQueue) cs.getQueue("default");
|
||||||
|
SchedulerNode sn1 = cs.getSchedulerNode(nm1.getNodeId());
|
||||||
|
SchedulerNode sn2 = cs.getSchedulerNode(nm2.getNodeId());
|
||||||
|
SchedulerNode sn3 = cs.getSchedulerNode(nm3.getNodeId());
|
||||||
|
|
||||||
|
// launch another app to queue, AM container should be launched in nm1
|
||||||
|
RMApp app1 = rm1.submitApp(4 * GB, "app", "user", null, "default");
|
||||||
|
MockAM am1 = MockRM.launchAndRegisterAM(app1, rm1, nm1);
|
||||||
|
Resource allocateResource = Resources.createResource(5 * GB);
|
||||||
|
am1.allocate("*", (int) allocateResource.getMemorySize(), 3, 0,
|
||||||
|
new ArrayList<ContainerId>(), "");
|
||||||
|
FiCaSchedulerApp schedulerApp1 =
|
||||||
|
cs.getApplicationAttempt(am1.getApplicationAttemptId());
|
||||||
|
|
||||||
|
cs.handle(new NodeUpdateSchedulerEvent(rmNode1));
|
||||||
|
Assert.assertEquals(1, schedulerApp1.getReservedContainers().size());
|
||||||
|
Assert.assertEquals(9 * GB,
|
||||||
|
defaultQueue.getQueueResourceUsage().getUsed().getMemorySize());
|
||||||
|
|
||||||
|
RMContainer reservedContainer =
|
||||||
|
schedulerApp1.getReservedContainers().get(0);
|
||||||
|
ResourceCommitRequest allocateFromSameReservedContainerProposal1 =
|
||||||
|
createAllocateFromReservedProposal(3, allocateResource, schedulerApp1,
|
||||||
|
sn2, sn1, cs.getRMContext(), reservedContainer);
|
||||||
|
boolean tryCommitResult = cs.tryCommit(cs.getClusterResource(),
|
||||||
|
allocateFromSameReservedContainerProposal1, true);
|
||||||
|
Assert.assertTrue(tryCommitResult);
|
||||||
|
ResourceCommitRequest allocateFromSameReservedContainerProposal2 =
|
||||||
|
createAllocateFromReservedProposal(4, allocateResource, schedulerApp1,
|
||||||
|
sn3, sn1, cs.getRMContext(), reservedContainer);
|
||||||
|
tryCommitResult = cs.tryCommit(cs.getClusterResource(),
|
||||||
|
allocateFromSameReservedContainerProposal2, true);
|
||||||
|
Assert.assertFalse("This proposal should be rejected because "
|
||||||
|
+ "it try to release an outdated reserved container", tryCommitResult);
|
||||||
|
|
||||||
|
rm1.close();
|
||||||
|
}
|
||||||
|
|
||||||
|
private ResourceCommitRequest createAllocateFromReservedProposal(
|
||||||
|
int containerId, Resource allocateResource, FiCaSchedulerApp schedulerApp,
|
||||||
|
SchedulerNode allocateNode, SchedulerNode reservedNode,
|
||||||
|
RMContext rmContext, RMContainer reservedContainer) {
|
||||||
|
Container container = Container.newInstance(
|
||||||
|
ContainerId.newContainerId(schedulerApp.getApplicationAttemptId(), containerId),
|
||||||
|
allocateNode.getNodeID(), allocateNode.getHttpAddress(), allocateResource,
|
||||||
|
Priority.newInstance(0), null);
|
||||||
|
RMContainer rmContainer = new RMContainerImpl(container, SchedulerRequestKey
|
||||||
|
.create(ResourceRequest
|
||||||
|
.newInstance(Priority.newInstance(0), "*", allocateResource, 1)),
|
||||||
|
schedulerApp.getApplicationAttemptId(), allocateNode.getNodeID(), "user",
|
||||||
|
rmContext);
|
||||||
|
SchedulerContainer allocateContainer =
|
||||||
|
new SchedulerContainer(schedulerApp, allocateNode, rmContainer, "", true);
|
||||||
|
SchedulerContainer reservedSchedulerContainer =
|
||||||
|
new SchedulerContainer(schedulerApp, reservedNode, reservedContainer, "",
|
||||||
|
false);
|
||||||
|
List<SchedulerContainer> toRelease = new ArrayList<>();
|
||||||
|
toRelease.add(reservedSchedulerContainer);
|
||||||
|
ContainerAllocationProposal allocateFromReservedProposal =
|
||||||
|
new ContainerAllocationProposal(allocateContainer, toRelease, null,
|
||||||
|
NodeType.OFF_SWITCH, NodeType.OFF_SWITCH,
|
||||||
|
SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY, allocateResource);
|
||||||
|
List<ContainerAllocationProposal> allocateProposals = new ArrayList<>();
|
||||||
|
allocateProposals.add(allocateFromReservedProposal);
|
||||||
|
return new ResourceCommitRequest(allocateProposals, null, null);
|
||||||
|
}
|
||||||
|
|
||||||
private void keepNMHeartbeat(List<MockNM> mockNMs, int interval) {
|
private void keepNMHeartbeat(List<MockNM> mockNMs, int interval) {
|
||||||
if (nmHeartbeatThread != null) {
|
if (nmHeartbeatThread != null) {
|
||||||
nmHeartbeatThread.setShouldStop();
|
nmHeartbeatThread.setShouldStop();
|
||||||
|
|
Loading…
Reference in New Issue