From 7eb783e2634d8c11fb646f1f2fdf597336325312 Mon Sep 17 00:00:00 2001 From: Weiwei Yang Date: Wed, 11 Apr 2018 17:15:25 +0800 Subject: [PATCH] YARN-8127. Resource leak when async scheduling is enabled. Contributed by Tao Yang. --- .../common/fica/FiCaSchedulerApp.java | 10 ++ .../TestCapacitySchedulerAsyncScheduling.java | 91 +++++++++++++++++++ 2 files changed, 101 insertions(+) diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerApp.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerApp.java index 32b2cad0ddf..3ec81915706 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerApp.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerApp.java @@ -339,6 +339,16 @@ private boolean commonCheckContainerAllocation( return false; } } + // If allocate from reserved container, make sure node is still reserved + if (allocation.getAllocateFromReservedContainer() != null + && reservedContainerOnNode == null) { + if (LOG.isDebugEnabled()) { + LOG.debug("Try to allocate from reserved container " + allocation + .getAllocateFromReservedContainer().getRmContainer() + .getContainerId() + ", but node is not reserved"); + } + return false; + } // Do we have enough space on this node? Resource availableResource = Resources.clone( diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacitySchedulerAsyncScheduling.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacitySchedulerAsyncScheduling.java index 18cd942832c..338b9f90fd1 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacitySchedulerAsyncScheduling.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacitySchedulerAsyncScheduling.java @@ -594,6 +594,97 @@ public void setShouldStop() { } } + // Testcase for YARN-8127 + @Test (timeout = 30000) + public void testCommitDuplicatedAllocateFromReservedProposals() + throws Exception { + // disable async-scheduling for simulating complex scene + Configuration disableAsyncConf = new Configuration(conf); + disableAsyncConf.setBoolean( + CapacitySchedulerConfiguration.SCHEDULE_ASYNCHRONOUSLY_ENABLE, false); + + // init RM & NMs + final MockRM rm = new MockRM(disableAsyncConf); + rm.start(); + final MockNM nm1 = rm.registerNode("192.168.0.1:1234", 8 * GB); + rm.registerNode("192.168.0.2:2234", 8 * GB); + + // init scheduler & nodes + while ( + ((CapacityScheduler) rm.getRMContext().getScheduler()).getNodeTracker() + .nodeCount() < 2) { + Thread.sleep(10); + } + Assert.assertEquals(2, + ((AbstractYarnScheduler) rm.getRMContext().getScheduler()) + .getNodeTracker().nodeCount()); + CapacityScheduler cs = + (CapacityScheduler) rm.getRMContext().getScheduler(); + SchedulerNode sn1 = cs.getSchedulerNode(nm1.getNodeId()); + + // launch app + RMApp app = rm.submitApp(1 * GB, "app", "user", null, false, "default", + YarnConfiguration.DEFAULT_RM_AM_MAX_ATTEMPTS, null, null, true, true); + MockAM am = MockRM.launchAndRegisterAM(app, rm, nm1); + FiCaSchedulerApp schedulerApp = + cs.getApplicationAttempt(am.getApplicationAttemptId()); + + // app asks 1 * 6G container + // nm1 runs 2 container(container_01/AM, container_02) + allocateAndLaunchContainers(am, nm1, rm, 1, + Resources.createResource(6 * GB), 0, 2); + Assert.assertEquals(2, sn1.getNumContainers()); + Assert.assertEquals(1 * GB, sn1.getUnallocatedResource().getMemorySize()); + + // app asks 5 * 2G container + // nm1 reserves 1 * 2G containers + am.allocate(Arrays.asList(ResourceRequest + .newInstance(Priority.newInstance(0), "*", + Resources.createResource(2 * GB), 5)), null); + cs.handle(new NodeUpdateSchedulerEvent(sn1.getRMNode())); + Assert.assertEquals(1, schedulerApp.getReservedContainers().size()); + + // rm kills 1 * 6G container_02 + for (RMContainer rmContainer : sn1.getCopiedListOfRunningContainers()) { + if (rmContainer.getContainerId().getContainerId() != 1) { + cs.completedContainer(rmContainer, ContainerStatus + .newInstance(rmContainer.getContainerId(), + ContainerState.COMPLETE, "", + ContainerExitStatus.KILLED_BY_RESOURCEMANAGER), + RMContainerEventType.KILL); + } + } + Assert.assertEquals(7 * GB, sn1.getUnallocatedResource().getMemorySize()); + + final CapacityScheduler spyCs = Mockito.spy(cs); + // handle CapacityScheduler#tryCommit, submit duplicated proposals + // that do allocation for reserved container for three times, + // to simulate that case in YARN-8127 + Mockito.doAnswer(new Answer() { + public Boolean answer(InvocationOnMock invocation) throws Exception { + ResourceCommitRequest request = + (ResourceCommitRequest) invocation.getArguments()[1]; + if (request.getFirstAllocatedOrReservedContainer() + .getAllocateFromReservedContainer() != null) { + for (int i=0; i<3; i++) { + cs.tryCommit((Resource) invocation.getArguments()[0], + (ResourceCommitRequest) invocation.getArguments()[1], + (Boolean) invocation.getArguments()[2]); + } + Assert.assertEquals(2, sn1.getCopiedListOfRunningContainers().size()); + Assert.assertEquals(5 * GB, + sn1.getUnallocatedResource().getMemorySize()); + } + return true; + } + }).when(spyCs).tryCommit(Mockito.any(Resource.class), + Mockito.any(ResourceCommitRequest.class), Mockito.anyBoolean()); + + spyCs.handle(new NodeUpdateSchedulerEvent(sn1.getRMNode())); + + rm.stop(); + } + private void keepNMHeartbeat(List mockNMs, int interval) { if (nmHeartbeatThread != null) { nmHeartbeatThread.setShouldStop();