diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerApp.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerApp.java index 184c1593d1b..192bfa0b515 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerApp.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerApp.java @@ -426,6 +426,19 @@ public class FiCaSchedulerApp extends SchedulerApplicationAttempt { // accepted & confirmed, it will become RESERVED state if (schedulerContainer.getRmContainer().getState() == RMContainerState.RESERVED) { + // Check if node currently reserved by other application, there may + // be some outdated proposals in async-scheduling environment + if (schedulerContainer.getRmContainer() != schedulerContainer + .getSchedulerNode().getReservedContainer()) { + if (LOG.isDebugEnabled()) { + LOG.debug("Try to re-reserve a container, but node " + + schedulerContainer.getSchedulerNode() + + " is already reserved by another container" + + schedulerContainer.getSchedulerNode() + .getReservedContainer().getContainerId()); + } + return false; + } // Set reReservation == true reReservation = true; } else { diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacitySchedulerAsyncScheduling.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacitySchedulerAsyncScheduling.java index fdd320aee32..da06557fd54 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacitySchedulerAsyncScheduling.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacitySchedulerAsyncScheduling.java @@ -20,7 +20,10 @@ package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.yarn.api.records.Container; +import org.apache.hadoop.yarn.api.records.ContainerExitStatus; import org.apache.hadoop.yarn.api.records.ContainerId; +import org.apache.hadoop.yarn.api.records.ContainerState; +import org.apache.hadoop.yarn.api.records.ContainerStatus; import org.apache.hadoop.yarn.api.records.Priority; import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.api.records.ResourceRequest; @@ -41,21 +44,27 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.AbstractYarnSched import org.apache.hadoop.yarn.server.resourcemanager.scheduler.NodeType; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNode; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.YarnScheduler; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.ContainerAllocationProposal; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.ResourceCommitRequest; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.SchedulerContainer; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerNode; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppAttemptRemovedSchedulerEvent; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeUpdateSchedulerEvent; import org.apache.hadoop.yarn.server.scheduler.SchedulerRequestKey; import org.apache.hadoop.yarn.util.resource.Resources; import org.junit.Assert; import org.junit.Before; import org.junit.Test; +import org.mockito.Mockito; +import org.mockito.invocation.InvocationOnMock; +import org.mockito.stubbing.Answer; import java.util.ArrayList; import java.util.Arrays; import java.util.List; +import java.util.concurrent.atomic.AtomicBoolean; public class TestCapacitySchedulerAsyncScheduling { private final int GB = 1024; @@ -263,6 +272,144 @@ public class TestCapacitySchedulerAsyncScheduling { rm.stop(); } + // Testcase for YARN-6678 + @Test(timeout = 30000) + public void testCommitOutdatedReservedProposal() throws Exception { + // disable async-scheduling for simulating complex since scene + Configuration disableAsyncConf = new Configuration(conf); + disableAsyncConf.setBoolean( + CapacitySchedulerConfiguration.SCHEDULE_ASYNCHRONOUSLY_ENABLE, false); + + // init RM & NMs & Nodes + final MockRM rm = new MockRM(disableAsyncConf); + rm.start(); + final MockNM nm1 = rm.registerNode("h1:1234", 9 * GB); + final MockNM nm2 = rm.registerNode("h2:2234", 9 * GB); + + // init scheduler nodes + int waitTime = 1000; + while (waitTime > 0 && + ((AbstractYarnScheduler) rm.getRMContext().getScheduler()) + .getNodeTracker().nodeCount() < 2) { + waitTime -= 10; + Thread.sleep(10); + } + Assert.assertEquals(2, + ((AbstractYarnScheduler) rm.getRMContext().getScheduler()) + .getNodeTracker().nodeCount()); + + YarnScheduler scheduler = rm.getRMContext().getScheduler(); + final SchedulerNode sn1 = + ((CapacityScheduler) scheduler).getSchedulerNode(nm1.getNodeId()); + final SchedulerNode sn2 = + ((CapacityScheduler) scheduler).getSchedulerNode(nm2.getNodeId()); + + // submit app1, am1 is running on nm1 + RMApp app = rm.submitApp(200, "app", "user", null, "default"); + final MockAM am = MockRM.launchAndRegisterAM(app, rm, nm1); + // submit app2, am2 is running on nm1 + RMApp app2 = rm.submitApp(200, "app", "user", null, "default"); + final MockAM am2 = MockRM.launchAndRegisterAM(app2, rm, nm1); + + // allocate and launch 2 containers for app1 + allocateAndLaunchContainers(am, nm1, rm, 1, + Resources.createResource(5 * GB), 0, 2); + allocateAndLaunchContainers(am, nm2, rm, 1, + Resources.createResource(5 * GB), 0, 3); + + // nm1 runs 3 containers(app1-container_01/AM, app1-container_02, + // app2-container_01/AM) + // nm2 runs 1 container(app1-container_03) + Assert.assertEquals(3, sn1.getNumContainers()); + Assert.assertEquals(1, sn2.getNumContainers()); + + // reserve 1 container(app1-container_04) for app1 on nm1 + ResourceRequest rr2 = ResourceRequest + .newInstance(Priority.newInstance(0), "*", + Resources.createResource(5 * GB), 1); + am.allocate(Arrays.asList(rr2), null); + nm1.nodeHeartbeat(true); + // wait app1-container_04 reserved on nm1 + waitTime = 1000; + while (waitTime > 0 && sn1.getReservedContainer() == null) { + waitTime -= 10; + Thread.sleep(10); + } + Assert.assertNotNull(sn1.getReservedContainer()); + + final CapacityScheduler cs = (CapacityScheduler) scheduler; + final CapacityScheduler spyCs = Mockito.spy(cs); + final AtomicBoolean isFirstReserve = new AtomicBoolean(true); + final AtomicBoolean isChecked = new AtomicBoolean(false); + // handle CapacityScheduler#tryCommit, + // reproduce the process that can raise IllegalStateException before + Mockito.doAnswer(new Answer() { + public Object answer(InvocationOnMock invocation) throws Exception { + ResourceCommitRequest request = + (ResourceCommitRequest) invocation.getArguments()[1]; + if (request.getContainersToReserve().size() > 0 && isFirstReserve + .compareAndSet(true, false)) { + // release app1-container_03 on nm2 + RMContainer killableContainer = + sn2.getCopiedListOfRunningContainers().get(0); + cs.completedContainer(killableContainer, ContainerStatus + .newInstance(killableContainer.getContainerId(), + ContainerState.COMPLETE, "", + ContainerExitStatus.KILLED_BY_RESOURCEMANAGER), + RMContainerEventType.KILL); + Assert.assertEquals(0, sn2.getCopiedListOfRunningContainers().size()); + // unreserve app1-container_04 on nm1 + // and allocate app1-container_05 on nm2 + cs.handle(new NodeUpdateSchedulerEvent(sn2.getRMNode())); + int waitTime = 1000; + while (waitTime > 0 + && sn2.getCopiedListOfRunningContainers().size() == 0) { + waitTime -= 10; + Thread.sleep(10); + } + Assert.assertEquals(1, sn2.getCopiedListOfRunningContainers().size()); + Assert.assertNull(sn1.getReservedContainer()); + + // reserve app2-container_02 on nm1 + ResourceRequest rr3 = ResourceRequest + .newInstance(Priority.newInstance(0), "*", + Resources.createResource(5 * GB), 1); + am2.allocate(Arrays.asList(rr3), null); + cs.handle(new NodeUpdateSchedulerEvent(sn1.getRMNode())); + waitTime = 1000; + while (waitTime > 0 && sn1.getReservedContainer() == null) { + waitTime -= 10; + Thread.sleep(10); + } + Assert.assertNotNull(sn1.getReservedContainer()); + + // call real apply + try { + cs.tryCommit((Resource) invocation.getArguments()[0], + (ResourceCommitRequest) invocation.getArguments()[1]); + } catch (Exception e) { + e.printStackTrace(); + Assert.fail(); + } + isChecked.set(true); + } else { + cs.tryCommit((Resource) invocation.getArguments()[0], + (ResourceCommitRequest) invocation.getArguments()[1]); + } + return null; + } + }).when(spyCs).tryCommit(Mockito.any(Resource.class), + Mockito.any(ResourceCommitRequest.class)); + + spyCs.handle(new NodeUpdateSchedulerEvent(sn1.getRMNode())); + + waitTime = 1000; + while (waitTime > 0 && !isChecked.get()) { + waitTime -= 10; + Thread.sleep(10); + } + rm.stop(); + } private void allocateAndLaunchContainers(MockAM am, MockNM nm, MockRM rm, int nContainer, Resource resource, int priority, int startContainerId)