YARN-8774. Memory leak when CapacityScheduler allocates from reserved container with non-default label. Contributed by Tao Yang.

(cherry picked from commit 8598b498bc)
This commit is contained in:
Eric E Payne 2018-09-28 15:32:07 +00:00
parent 5b72aa04e1
commit c306da08ec
2 changed files with 79 additions and 1 deletions

View File

@ -751,9 +751,25 @@ public class RegularContainerAllocator extends AbstractContainerAllocator {
// When reserving container // When reserving container
RMContainer updatedContainer = reservedContainer; RMContainer updatedContainer = reservedContainer;
if (updatedContainer == null) { if (updatedContainer == null) {
AppPlacementAllocator<FiCaSchedulerNode> ps =
application.getAppSchedulingInfo()
.getAppPlacementAllocator(schedulerKey);
if (null == ps) {
LOG.warn("Failed to get " + AppPlacementAllocator.class.getName()
+ " for application=" + application.getApplicationId()
+ " schedulerRequestKey=" + schedulerKey);
ActivitiesLogger.APP
.recordAppActivityWithoutAllocation(activitiesManager, node,
application, schedulerKey.getPriority(),
ActivityDiagnosticConstant.
PRIORITY_SKIPPED_BECAUSE_NULL_ANY_REQUEST,
ActivityState.REJECTED);
return ContainerAllocation.PRIORITY_SKIPPED;
}
updatedContainer = new RMContainerImpl(container, schedulerKey, updatedContainer = new RMContainerImpl(container, schedulerKey,
application.getApplicationAttemptId(), node.getNodeID(), application.getApplicationAttemptId(), node.getNodeID(),
application.getAppSchedulingInfo().getUser(), rmContext); application.getAppSchedulingInfo().getUser(), rmContext,
ps.getPrimaryRequestedNodePartition());
} }
allocationResult.updatedContainer = updatedContainer; allocationResult.updatedContainer = updatedContainer;
} }

View File

@ -547,6 +547,68 @@ public class TestNodeLabelContainerAllocation {
rm1.close(); rm1.close();
} }
@Test (timeout = 120000)
public void testRMContainerLeakInLeafQueue() throws Exception {
// set node -> label
mgr.addToCluserNodeLabelsWithDefaultExclusivity(ImmutableSet.of("x"));
mgr.addLabelsToNode(ImmutableMap.of(NodeId.newInstance("h1", 0), toSet("x"),
NodeId.newInstance("h2", 0), toSet("x")));
// inject node label manager
MockRM rm1 =
new MockRM(TestUtils.getConfigurationWithDefaultQueueLabels(conf)) {
@Override public RMNodeLabelsManager createNodeLabelManager() {
return mgr;
}
};
rm1.getRMContext().setNodeLabelManager(mgr);
rm1.start();
MockNM nm1 = rm1.registerNode("h1:1234", 8 * GB); // label = x
rm1.registerNode("h2:1234", 8 * GB); // label = x
// launch an app to queue a1 (label = x), and check all container will
// be allocated in h1
RMApp app1 = rm1.submitApp(1 * GB, "app1", "user", null, "a1");
MockAM am1 = MockRM.launchAndRegisterAM(app1, rm1, nm1);
RMApp app2 = rm1.submitApp(1 * GB, "app2", "user", null, "a1");
MockRM.launchAndRegisterAM(app2, rm1, nm1);
// request a container.
am1.allocate("*", 7 * GB, 2, new ArrayList<ContainerId>());
CapacityScheduler cs = (CapacityScheduler) rm1.getResourceScheduler();
RMNode rmNode1 = rm1.getRMContext().getRMNodes().get(nm1.getNodeId());
LeafQueue leafQueue = (LeafQueue) cs.getQueue("a1");
// Do node heartbeats 1 time
// scheduler will reserve a container for app1
cs.handle(new NodeUpdateSchedulerEvent(rmNode1));
// Check if a 4G container allocated for app1, and 4G is reserved
FiCaSchedulerApp schedulerApp1 =
cs.getApplicationAttempt(am1.getApplicationAttemptId());
Assert.assertEquals(1, schedulerApp1.getLiveContainers().size());
Assert.assertEquals(1, schedulerApp1.getReservedContainers().size());
// kill app2 then do node heartbeat 1 time
// scheduler will allocate a container from the reserved container on nm1
rm1.killApp(app2.getApplicationId());
rm1.waitForState(app2.getApplicationId(), RMAppState.KILLED);
cs.handle(new NodeUpdateSchedulerEvent(rmNode1));
Assert.assertEquals(2, schedulerApp1.getLiveContainers().size());
Assert.assertEquals(0, schedulerApp1.getReservedContainers().size());
// After kill app1, LeafQueue#ignorePartitionExclusivityRMContainers should
// be clean, otherwise resource leak happened
rm1.killApp(app1.getApplicationId());
rm1.waitForState(app1.getApplicationId(), RMAppState.KILLED);
Assert.assertEquals(0, leafQueue.getIgnoreExclusivityRMContainers().size());
rm1.close();
}
private void checkPendingResource(MockRM rm, int priority, private void checkPendingResource(MockRM rm, int priority,
ApplicationAttemptId attemptId, int memory) { ApplicationAttemptId attemptId, int memory) {
CapacityScheduler cs = (CapacityScheduler) rm.getRMContext().getScheduler(); CapacityScheduler cs = (CapacityScheduler) rm.getRMContext().getScheduler();