YARN-8127. Resource leak when async scheduling is enabled. Contributed by Tao Yang.

(cherry picked from commit 7eb783e263)
This commit is contained in:
Weiwei Yang 2018-04-11 17:15:25 +08:00
parent 86f543aa85
commit c2036af196
2 changed files with 101 additions and 0 deletions

View File

@ -339,6 +339,16 @@ public class FiCaSchedulerApp extends SchedulerApplicationAttempt {
return false; return false;
} }
} }
// If allocate from reserved container, make sure node is still reserved
if (allocation.getAllocateFromReservedContainer() != null
&& reservedContainerOnNode == null) {
if (LOG.isDebugEnabled()) {
LOG.debug("Try to allocate from reserved container " + allocation
.getAllocateFromReservedContainer().getRmContainer()
.getContainerId() + ", but node is not reserved");
}
return false;
}
// Do we have enough space on this node? // Do we have enough space on this node?
Resource availableResource = Resources.clone( Resource availableResource = Resources.clone(

View File

@ -594,6 +594,97 @@ public class TestCapacitySchedulerAsyncScheduling {
} }
} }
// Testcase for YARN-8127
@Test (timeout = 30000)
public void testCommitDuplicatedAllocateFromReservedProposals()
throws Exception {
// disable async-scheduling for simulating complex scene
Configuration disableAsyncConf = new Configuration(conf);
disableAsyncConf.setBoolean(
CapacitySchedulerConfiguration.SCHEDULE_ASYNCHRONOUSLY_ENABLE, false);
// init RM & NMs
final MockRM rm = new MockRM(disableAsyncConf);
rm.start();
final MockNM nm1 = rm.registerNode("192.168.0.1:1234", 8 * GB);
rm.registerNode("192.168.0.2:2234", 8 * GB);
// init scheduler & nodes
while (
((CapacityScheduler) rm.getRMContext().getScheduler()).getNodeTracker()
.nodeCount() < 2) {
Thread.sleep(10);
}
Assert.assertEquals(2,
((AbstractYarnScheduler) rm.getRMContext().getScheduler())
.getNodeTracker().nodeCount());
CapacityScheduler cs =
(CapacityScheduler) rm.getRMContext().getScheduler();
SchedulerNode sn1 = cs.getSchedulerNode(nm1.getNodeId());
// launch app
RMApp app = rm.submitApp(1 * GB, "app", "user", null, false, "default",
YarnConfiguration.DEFAULT_RM_AM_MAX_ATTEMPTS, null, null, true, true);
MockAM am = MockRM.launchAndRegisterAM(app, rm, nm1);
FiCaSchedulerApp schedulerApp =
cs.getApplicationAttempt(am.getApplicationAttemptId());
// app asks 1 * 6G container
// nm1 runs 2 container(container_01/AM, container_02)
allocateAndLaunchContainers(am, nm1, rm, 1,
Resources.createResource(6 * GB), 0, 2);
Assert.assertEquals(2, sn1.getNumContainers());
Assert.assertEquals(1 * GB, sn1.getUnallocatedResource().getMemorySize());
// app asks 5 * 2G container
// nm1 reserves 1 * 2G containers
am.allocate(Arrays.asList(ResourceRequest
.newInstance(Priority.newInstance(0), "*",
Resources.createResource(2 * GB), 5)), null);
cs.handle(new NodeUpdateSchedulerEvent(sn1.getRMNode()));
Assert.assertEquals(1, schedulerApp.getReservedContainers().size());
// rm kills 1 * 6G container_02
for (RMContainer rmContainer : sn1.getCopiedListOfRunningContainers()) {
if (rmContainer.getContainerId().getContainerId() != 1) {
cs.completedContainer(rmContainer, ContainerStatus
.newInstance(rmContainer.getContainerId(),
ContainerState.COMPLETE, "",
ContainerExitStatus.KILLED_BY_RESOURCEMANAGER),
RMContainerEventType.KILL);
}
}
Assert.assertEquals(7 * GB, sn1.getUnallocatedResource().getMemorySize());
final CapacityScheduler spyCs = Mockito.spy(cs);
// handle CapacityScheduler#tryCommit, submit duplicated proposals
// that do allocation for reserved container for three times,
// to simulate that case in YARN-8127
Mockito.doAnswer(new Answer<Object>() {
public Boolean answer(InvocationOnMock invocation) throws Exception {
ResourceCommitRequest request =
(ResourceCommitRequest) invocation.getArguments()[1];
if (request.getFirstAllocatedOrReservedContainer()
.getAllocateFromReservedContainer() != null) {
for (int i=0; i<3; i++) {
cs.tryCommit((Resource) invocation.getArguments()[0],
(ResourceCommitRequest) invocation.getArguments()[1],
(Boolean) invocation.getArguments()[2]);
}
Assert.assertEquals(2, sn1.getCopiedListOfRunningContainers().size());
Assert.assertEquals(5 * GB,
sn1.getUnallocatedResource().getMemorySize());
}
return true;
}
}).when(spyCs).tryCommit(Mockito.any(Resource.class),
Mockito.any(ResourceCommitRequest.class), Mockito.anyBoolean());
spyCs.handle(new NodeUpdateSchedulerEvent(sn1.getRMNode()));
rm.stop();
}
private void keepNMHeartbeat(List<MockNM> mockNMs, int interval) { private void keepNMHeartbeat(List<MockNM> mockNMs, int interval) {
if (nmHeartbeatThread != null) { if (nmHeartbeatThread != null) {
nmHeartbeatThread.setShouldStop(); nmHeartbeatThread.setShouldStop();