From 734bc4228974512ef8ff9fc6e339ac281d0fb1d4 Mon Sep 17 00:00:00 2001 From: Weiwei Yang Date: Fri, 10 Aug 2018 14:37:45 +0800 Subject: [PATCH] YARN-8575. Avoid committing allocation proposal to unavailable nodes in async scheduling. Contributed by Tao Yang. (cherry picked from commit 0a71bf145293adbd3728525ab4c36c08d51377d3) --- .../common/fica/FiCaSchedulerApp.java | 12 ++++ .../server/resourcemanager/MockNodes.java | 6 +- .../resourcemanager/TestResourceManager.java | 16 ++++- .../TestCapacitySchedulerAsyncScheduling.java | 69 +++++++++++++++++++ .../scheduler/capacity/TestUtils.java | 2 + 5 files changed, 100 insertions(+), 5 deletions(-) diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerApp.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerApp.java index 9810e98c3e8..6a5af814ed4 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerApp.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerApp.java @@ -39,6 +39,7 @@ import org.apache.hadoop.yarn.api.records.ContainerStatus; import org.apache.hadoop.yarn.api.records.NMToken; import org.apache.hadoop.yarn.api.records.NodeId; import org.apache.hadoop.yarn.api.records.NodeLabel; +import org.apache.hadoop.yarn.api.records.NodeState; import org.apache.hadoop.yarn.api.records.Priority; import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.api.records.ResourceRequest; @@ -429,6 +430,17 @@ public class FiCaSchedulerApp extends SchedulerApplicationAttempt { SchedulerContainer schedulerContainer = allocation.getAllocatedOrReservedContainer(); + // Make sure node is in RUNNING state + if (schedulerContainer.getSchedulerNode().getRMNode().getState() + != NodeState.RUNNING) { + if (LOG.isDebugEnabled()) { + LOG.debug("Failed to accept this proposal because node " + + schedulerContainer.getSchedulerNode().getNodeID() + " is in " + + schedulerContainer.getSchedulerNode().getRMNode().getState() + + " state (not RUNNING)"); + } + return false; + } if (schedulerContainer.isAllocated()) { // When allocate a new container containerRequest = diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockNodes.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockNodes.java index 90411328f25..c444b6ee586 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockNodes.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockNodes.java @@ -347,17 +347,17 @@ public class MockNodes { } public static RMNode newNodeInfo(int rack, final Resource perNode, int hostnum) { - return buildRMNode(rack, perNode, null, "localhost:0", hostnum, null, 123); + return buildRMNode(rack, perNode, NodeState.RUNNING, "localhost:0", hostnum, null, 123); } public static RMNode newNodeInfo(int rack, final Resource perNode, int hostnum, String hostName) { - return buildRMNode(rack, perNode, null, "localhost:0", hostnum, hostName, 123); + return buildRMNode(rack, perNode, NodeState.RUNNING, "localhost:0", hostnum, hostName, 123); } public static RMNode newNodeInfo(int rack, final Resource perNode, int hostnum, String hostName, int port) { - return buildRMNode(rack, perNode, null, "localhost:0", hostnum, hostName, port); + return buildRMNode(rack, perNode, NodeState.RUNNING, "localhost:0", hostnum, hostName, port); } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestResourceManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestResourceManager.java index 941e4775b2e..a66c583ccf7 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestResourceManager.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestResourceManager.java @@ -23,6 +23,7 @@ import static org.junit.Assert.fail; import java.io.IOException; import java.util.Collection; +import java.util.concurrent.TimeoutException; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -39,6 +40,8 @@ import org.apache.hadoop.yarn.exceptions.YarnException; import org.apache.hadoop.yarn.exceptions.YarnRuntimeException; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptState; import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode; +import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeImpl; +import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeStartedEvent; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.AbstractYarnScheduler; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppAttemptRemovedSchedulerEvent; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeAddedSchedulerEvent; @@ -86,8 +89,9 @@ public class TestResourceManager { } @Test - public void testResourceAllocation() throws IOException, - YarnException, InterruptedException { + public void testResourceAllocation() + throws IOException, YarnException, InterruptedException, + TimeoutException { LOG.info("--- START: testResourceAllocation ---"); final int memory = 4 * 1024; @@ -105,6 +109,14 @@ public class TestResourceManager { registerNode(host2, 1234, 2345, NetworkTopology.DEFAULT_RACK, Resources.createResource(memory/2, vcores/2)); + // nodes should be in RUNNING state + RMNodeImpl node1 = (RMNodeImpl) resourceManager.getRMContext().getRMNodes().get( + nm1.getNodeId()); + RMNodeImpl node2 = (RMNodeImpl) resourceManager.getRMContext().getRMNodes().get( + nm2.getNodeId()); + node1.handle(new RMNodeStartedEvent(nm1.getNodeId(), null, null)); + node2.handle(new RMNodeStartedEvent(nm2.getNodeId(), null, null)); + // Submit an application Application application = new Application("user1", resourceManager); application.submit(); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacitySchedulerAsyncScheduling.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacitySchedulerAsyncScheduling.java index c2c1519aa92..840d30dc1d1 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacitySchedulerAsyncScheduling.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacitySchedulerAsyncScheduling.java @@ -18,12 +18,14 @@ package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity; +import com.google.common.collect.ImmutableList; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.yarn.api.records.Container; import org.apache.hadoop.yarn.api.records.ContainerExitStatus; import org.apache.hadoop.yarn.api.records.ContainerId; import org.apache.hadoop.yarn.api.records.ContainerState; import org.apache.hadoop.yarn.api.records.ContainerStatus; +import org.apache.hadoop.yarn.api.records.NodeState; import org.apache.hadoop.yarn.api.records.Priority; import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.api.records.ResourceRequest; @@ -43,6 +45,8 @@ import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerEven import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerImpl; import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerState; import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode; +import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeEvent; +import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeEventType; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.AbstractYarnScheduler; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.NodeType; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler; @@ -745,6 +749,71 @@ public class TestCapacitySchedulerAsyncScheduling { rm1.close(); } + @Test(timeout = 30000) + public void testCommitProposalsForUnusableNode() throws Exception { + // disable async-scheduling for simulating complex scene + Configuration disableAsyncConf = new Configuration(conf); + disableAsyncConf.setBoolean( + CapacitySchedulerConfiguration.SCHEDULE_ASYNCHRONOUSLY_ENABLE, false); + + // init RM & NMs + final MockRM rm = new MockRM(disableAsyncConf); + rm.start(); + final MockNM nm1 = rm.registerNode("192.168.0.1:1234", 8 * GB); + final MockNM nm2 = rm.registerNode("192.168.0.2:2234", 8 * GB); + final MockNM nm3 = rm.registerNode("192.168.0.3:2234", 8 * GB); + rm.drainEvents(); + CapacityScheduler cs = + (CapacityScheduler) rm.getRMContext().getScheduler(); + SchedulerNode sn1 = cs.getSchedulerNode(nm1.getNodeId()); + + // launch app1-am on nm1 + RMApp app1 = rm.submitApp(1 * GB, "app1", "user", null, false, "default", + YarnConfiguration.DEFAULT_RM_AM_MAX_ATTEMPTS, null, null, true, true); + MockAM am1 = MockRM.launchAndRegisterAM(app1, rm, nm1); + + // launch app2-am on nm2 + RMApp app2 = rm.submitApp(1 * GB, "app2", "user", null, false, "default", + YarnConfiguration.DEFAULT_RM_AM_MAX_ATTEMPTS, null, null, true, true); + MockAM am2 = MockRM.launchAndRegisterAM(app2, rm, nm2); + + // app2 asks 1 * 8G container + am2.allocate(ImmutableList.of(ResourceRequest + .newInstance(Priority.newInstance(0), "*", + Resources.createResource(8 * GB), 1)), null); + + List reservedProposalParts = new ArrayList<>(); + final CapacityScheduler spyCs = Mockito.spy(cs); + // handle CapacityScheduler#tryCommit + Mockito.doAnswer(new Answer() { + public Boolean answer(InvocationOnMock invocation) throws Exception { + for (Object argument : invocation.getArguments()) { + reservedProposalParts.add(argument); + } + return false; + } + }).when(spyCs).tryCommit(Mockito.any(Resource.class), + Mockito.any(ResourceCommitRequest.class), Mockito.anyBoolean()); + + spyCs.handle(new NodeUpdateSchedulerEvent(sn1.getRMNode())); + + // decommission nm1 + RMNode rmNode1 = cs.getNode(nm1.getNodeId()).getRMNode(); + cs.getRMContext().getDispatcher().getEventHandler().handle( + new RMNodeEvent(nm1.getNodeId(), RMNodeEventType.DECOMMISSION)); + rm.drainEvents(); + Assert.assertEquals(NodeState.DECOMMISSIONED, rmNode1.getState()); + Assert.assertNull(cs.getNode(nm1.getNodeId())); + + // try commit after nm1 decommissioned + boolean isSuccess = + cs.tryCommit((Resource) reservedProposalParts.get(0), + (ResourceCommitRequest) reservedProposalParts.get(1), + (Boolean) reservedProposalParts.get(2)); + Assert.assertFalse(isSuccess); + rm.stop(); + } + private ResourceCommitRequest createAllocateFromReservedProposal( int containerId, Resource allocateResource, FiCaSchedulerApp schedulerApp, SchedulerNode allocateNode, SchedulerNode reservedNode, diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestUtils.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestUtils.java index fae63be5051..b13790d357c 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestUtils.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestUtils.java @@ -27,6 +27,7 @@ import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.Container; import org.apache.hadoop.yarn.api.records.ContainerId; import org.apache.hadoop.yarn.api.records.NodeId; +import org.apache.hadoop.yarn.api.records.NodeState; import org.apache.hadoop.yarn.api.records.Priority; import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.api.records.ResourceInformation; @@ -220,6 +221,7 @@ public class TestUtils { when(rmNode.getNodeAddress()).thenReturn(host+":"+port); when(rmNode.getHostName()).thenReturn(host); when(rmNode.getRackName()).thenReturn(rack); + when(rmNode.getState()).thenReturn(NodeState.RUNNING); FiCaSchedulerNode node = spy(new FiCaSchedulerNode(rmNode, false)); LOG.info("node = " + host + " avail=" + node.getUnallocatedResource());