From f97bd6bb7f7b4e7a99a572d9d9f9232702ea8318 Mon Sep 17 00:00:00 2001 From: Robert Kanter Date: Mon, 4 Jun 2018 15:59:27 -0700 Subject: [PATCH] YARN-4677. RMNodeResourceUpdateEvent update from scheduler can lead to race condition (wilfreds and gphillips via rkanter) --- .../scheduler/AbstractYarnScheduler.java | 22 +++++--- .../scheduler/fair/FairScheduler.java | 2 +- .../scheduler/fifo/FifoScheduler.java | 4 +- .../capacity/TestCapacityScheduler.java | 51 +++++++++++------ .../scheduler/fair/TestFairScheduler.java | 43 +++++++++++---- .../scheduler/fifo/TestFifoScheduler.java | 55 +++++++++++++------ 6 files changed, 122 insertions(+), 55 deletions(-) diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AbstractYarnScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AbstractYarnScheduler.java index fadab6ad882..4f51e4e2f1b 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AbstractYarnScheduler.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AbstractYarnScheduler.java @@ -1087,6 +1087,10 @@ public abstract class AbstractYarnScheduler // Process new container information List completedContainers = updateNewContainerInfo(nm); + // NOTICE: it is possible to not find the NodeID as a node can be + // decommissioned at the same time. Skip updates if node is null. + SchedulerNode schedulerNode = getNode(nm.getNodeID()); + // Process completed containers Resource releasedResources = Resource.newInstance(0, 0); int releasedContainers = updateCompletedContainers(completedContainers, @@ -1095,26 +1099,26 @@ public abstract class AbstractYarnScheduler // If the node is decommissioning, send an update to have the total // resource equal to the used resource, so no available resource to // schedule. - // TODO YARN-5128: Fix possible race-condition when request comes in before - // update is propagated - if (nm.getState() == NodeState.DECOMMISSIONING) { + if (nm.getState() == NodeState.DECOMMISSIONING && schedulerNode != null) { this.rmContext .getDispatcher() .getEventHandler() .handle( new RMNodeResourceUpdateEvent(nm.getNodeID(), ResourceOption - .newInstance(getSchedulerNode(nm.getNodeID()) - .getAllocatedResource(), 0))); + .newInstance(schedulerNode.getAllocatedResource(), 0))); } updateSchedulerHealthInformation(releasedResources, releasedContainers); - updateNodeResourceUtilization(nm); + if (schedulerNode != null) { + updateNodeResourceUtilization(nm); + } // Now node data structures are up-to-date and ready for scheduling. if(LOG.isDebugEnabled()) { - SchedulerNode node = getNode(nm.getNodeID()); - LOG.debug("Node being looked for scheduling " + nm + - " availableResource: " + node.getUnallocatedResource()); + LOG.debug( + "Node being looked for scheduling " + nm + " availableResource: " + + (schedulerNode == null ? "unknown (decomissioned)" : + schedulerNode.getUnallocatedResource())); } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java index b7fcbe833c7..04aae19199c 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java @@ -1009,7 +1009,7 @@ public class FairScheduler extends return; } - final NodeId nodeID = node.getNodeID(); + final NodeId nodeID = (node != null ? node.getNodeID() : null); if (!nodeTracker.exists(nodeID)) { // The node might have just been removed while this thread was waiting // on the synchronized lock before it entered this synchronized method diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/FifoScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/FifoScheduler.java index ac1d005555b..5c923552fc2 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/FifoScheduler.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/FifoScheduler.java @@ -970,7 +970,9 @@ public class FifoScheduler extends return; } - if (Resources.greaterThanOrEqual(resourceCalculator, getClusterResource(), + // A decommissioned node might be removed before we get here + if (node != null && + Resources.greaterThanOrEqual(resourceCalculator, getClusterResource(), node.getUnallocatedResource(), minimumAllocation)) { LOG.debug("Node heartbeat " + nm.getNodeID() + " available resource = " + node.getUnallocatedResource()); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacityScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacityScheduler.java index 396a2aa0265..72dcdb5e6f0 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacityScheduler.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacityScheduler.java @@ -280,14 +280,11 @@ public class TestCapacityScheduler { } } - private NodeManager - registerNode(String hostName, int containerManagerPort, int httpPort, - String rackName, Resource capability) + private NodeManager registerNode(String hostName, int containerManagerPort, + int httpPort, String rackName, Resource capability) throws IOException, YarnException { - NodeManager nm = - new NodeManager( - hostName, containerManagerPort, httpPort, rackName, capability, - resourceManager); + NodeManager nm = new NodeManager(hostName, containerManagerPort, httpPort, + rackName, capability, resourceManager); NodeAddedSchedulerEvent nodeAddEvent1 = new NodeAddedSchedulerEvent(resourceManager.getRMContext() .getRMNodes().get(nm.getNodeId())); @@ -302,15 +299,13 @@ public class TestCapacityScheduler { // Register node1 String host_0 = "host_0"; - org.apache.hadoop.yarn.server.resourcemanager.NodeManager nm_0 = - registerNode(host_0, 1234, 2345, NetworkTopology.DEFAULT_RACK, - Resources.createResource(4 * GB, 1)); + NodeManager nm_0 = registerNode(host_0, 1234, 2345, + NetworkTopology.DEFAULT_RACK, Resources.createResource(4 * GB, 1)); // Register node2 String host_1 = "host_1"; - org.apache.hadoop.yarn.server.resourcemanager.NodeManager nm_1 = - registerNode(host_1, 1234, 2345, NetworkTopology.DEFAULT_RACK, - Resources.createResource(2 * GB, 1)); + NodeManager nm_1 = registerNode(host_1, 1234, 2345, + NetworkTopology.DEFAULT_RACK, Resources.createResource(2 * GB, 1)); // ResourceRequest priorities Priority priority_0 = Priority.newInstance(0); @@ -4084,6 +4079,31 @@ public class TestCapacityScheduler { } } + @Test + public void testRemovedNodeDecomissioningNode() throws Exception { + // Register nodemanager + NodeManager nm = registerNode("host_decom", 1234, 2345, + NetworkTopology.DEFAULT_RACK, Resources.createResource(8 * GB, 4)); + + RMNode node = + resourceManager.getRMContext().getRMNodes().get(nm.getNodeId()); + // Send a heartbeat to kick the tires on the Scheduler + NodeUpdateSchedulerEvent nodeUpdate = new NodeUpdateSchedulerEvent(node); + resourceManager.getResourceScheduler().handle(nodeUpdate); + + // force remove the node to simulate race condition + ((CapacityScheduler) resourceManager.getResourceScheduler()). + getNodeTracker().removeNode(nm.getNodeId()); + // Kick off another heartbeat with the node state mocked to decommissioning + RMNode spyNode = + Mockito.spy(resourceManager.getRMContext().getRMNodes() + .get(nm.getNodeId())); + when(spyNode.getState()).thenReturn(NodeState.DECOMMISSIONING); + resourceManager.getResourceScheduler().handle( + new NodeUpdateSchedulerEvent(spyNode)); + } + + @Test public void testResourceUpdateDecommissioningNode() throws Exception { // Mock the RMNodeResourceUpdate event handler to update SchedulerNode @@ -4109,9 +4129,8 @@ public class TestCapacityScheduler { ((AsyncDispatcher) mockDispatcher).start(); // Register node String host_0 = "host_0"; - org.apache.hadoop.yarn.server.resourcemanager.NodeManager nm_0 = - registerNode(host_0, 1234, 2345, NetworkTopology.DEFAULT_RACK, - Resources.createResource(8 * GB, 4)); + NodeManager nm_0 = registerNode(host_0, 1234, 2345, + NetworkTopology.DEFAULT_RACK, Resources.createResource(8 * GB, 4)); // ResourceRequest priorities Priority priority_0 = Priority.newInstance(0); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairScheduler.java index 3bc1216241d..da3f1609ff2 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairScheduler.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairScheduler.java @@ -77,6 +77,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.MockAM; import org.apache.hadoop.yarn.server.resourcemanager.MockNM; import org.apache.hadoop.yarn.server.resourcemanager.MockNodes; import org.apache.hadoop.yarn.server.resourcemanager.MockRM; +import org.apache.hadoop.yarn.server.resourcemanager.NodeManager; import org.apache.hadoop.yarn.server.resourcemanager.RMContext; import org.apache.hadoop.yarn.server.resourcemanager.RMContextImpl; import org.apache.hadoop.yarn.server.resourcemanager.recovery.MemoryRMStateStore; @@ -4967,6 +4968,30 @@ public class TestFairScheduler extends FairSchedulerTestBase { .get(attId3.getApplicationId()).getQueue()); } + @Test + public void testRemovedNodeDecomissioningNode() throws Exception { + // Register nodemanager + NodeManager nm = registerNode("host_decom", 1234, 2345, + NetworkTopology.DEFAULT_RACK, Resources.createResource(8 * GB, 4)); + + RMNode node = + resourceManager.getRMContext().getRMNodes().get(nm.getNodeId()); + // Send a heartbeat to kick the tires on the Scheduler + NodeUpdateSchedulerEvent nodeUpdate = new NodeUpdateSchedulerEvent(node); + resourceManager.getResourceScheduler().handle(nodeUpdate); + + // Force remove the node to simulate race condition + ((FairScheduler) resourceManager.getResourceScheduler()) + .getNodeTracker().removeNode(nm.getNodeId()); + // Kick off another heartbeat with the node state mocked to decommissioning + RMNode spyNode = + Mockito.spy(resourceManager.getRMContext().getRMNodes() + .get(nm.getNodeId())); + when(spyNode.getState()).thenReturn(NodeState.DECOMMISSIONING); + resourceManager.getResourceScheduler().handle( + new NodeUpdateSchedulerEvent(spyNode)); + } + @Test public void testResourceUpdateDecommissioningNode() throws Exception { // Mock the RMNodeResourceUpdate event handler to update SchedulerNode @@ -4992,9 +5017,8 @@ public class TestFairScheduler extends FairSchedulerTestBase { ((AsyncDispatcher) mockDispatcher).start(); // Register node String host_0 = "host_0"; - org.apache.hadoop.yarn.server.resourcemanager.NodeManager nm_0 = - registerNode(host_0, 1234, 2345, NetworkTopology.DEFAULT_RACK, - Resources.createResource(8 * GB, 4)); + NodeManager nm_0 = registerNode(host_0, 1234, 2345, + NetworkTopology.DEFAULT_RACK, Resources.createResource(8 * GB, 4)); RMNode node = resourceManager.getRMContext().getRMNodes().get(nm_0.getNodeId()); @@ -5032,13 +5056,12 @@ public class TestFairScheduler extends FairSchedulerTestBase { Assert.assertEquals(availableResource.getVirtualCores(), 0); } - private org.apache.hadoop.yarn.server.resourcemanager.NodeManager registerNode( - String hostName, int containerManagerPort, int httpPort, String rackName, - Resource capability) throws IOException, YarnException { - org.apache.hadoop.yarn.server.resourcemanager.NodeManager nm = - new org.apache.hadoop.yarn.server.resourcemanager.NodeManager(hostName, - containerManagerPort, httpPort, rackName, capability, - resourceManager); + private NodeManager registerNode(String hostName, int containerManagerPort, + int httpPort, String rackName, + Resource capability) + throws IOException, YarnException { + NodeManager nm = new NodeManager(hostName, containerManagerPort, httpPort, + rackName, capability, resourceManager); // after YARN-5375, scheduler event is processed in rm main dispatcher, // wait it processed, or may lead dead lock diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/TestFifoScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/TestFifoScheduler.java index 156b4ac2774..e772ac73766 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/TestFifoScheduler.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/TestFifoScheduler.java @@ -66,6 +66,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.MockAM; import org.apache.hadoop.yarn.server.resourcemanager.MockNM; import org.apache.hadoop.yarn.server.resourcemanager.MockNodes; import org.apache.hadoop.yarn.server.resourcemanager.MockRM; +import org.apache.hadoop.yarn.server.resourcemanager.NodeManager; import org.apache.hadoop.yarn.server.resourcemanager.RMContext; import org.apache.hadoop.yarn.server.resourcemanager.RMContextImpl; import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager; @@ -137,14 +138,11 @@ public class TestFifoScheduler { resourceManager.stop(); } - private org.apache.hadoop.yarn.server.resourcemanager.NodeManager - registerNode(String hostName, int containerManagerPort, int nmHttpPort, - String rackName, Resource capability) throws IOException, - YarnException { - org.apache.hadoop.yarn.server.resourcemanager.NodeManager nm = - new org.apache.hadoop.yarn.server.resourcemanager.NodeManager(hostName, - containerManagerPort, nmHttpPort, rackName, capability, - resourceManager); + private NodeManager registerNode(String hostName, int containerManagerPort, + int nmHttpPort, String rackName, Resource capability) + throws IOException, YarnException { + NodeManager nm = new NodeManager(hostName, containerManagerPort, nmHttpPort, + rackName, capability, resourceManager); NodeAddedSchedulerEvent nodeAddEvent1 = new NodeAddedSchedulerEvent(resourceManager.getRMContext().getRMNodes() .get(nm.getNodeId())); @@ -402,16 +400,14 @@ public class TestFifoScheduler { // Register node1 String host_0 = "host_0"; - org.apache.hadoop.yarn.server.resourcemanager.NodeManager nm_0 = - registerNode(host_0, 1234, 2345, NetworkTopology.DEFAULT_RACK, - Resources.createResource(4 * GB, 1)); + NodeManager nm_0 = registerNode(host_0, 1234, 2345, + NetworkTopology.DEFAULT_RACK, Resources.createResource(4 * GB, 1)); nm_0.heartbeat(); // Register node2 String host_1 = "host_1"; - org.apache.hadoop.yarn.server.resourcemanager.NodeManager nm_1 = - registerNode(host_1, 1234, 2345, NetworkTopology.DEFAULT_RACK, - Resources.createResource(2 * GB, 1)); + NodeManager nm_1 = registerNode(host_1, 1234, 2345, + NetworkTopology.DEFAULT_RACK, Resources.createResource(2 * GB, 1)); nm_1.heartbeat(); // ResourceRequest priorities @@ -1190,6 +1186,30 @@ public class TestFifoScheduler { rm.stop(); } + @Test + public void testRemovedNodeDecomissioningNode() throws Exception { + // Register nodemanager + NodeManager nm = registerNode("host_decom", 1234, 2345, + NetworkTopology.DEFAULT_RACK, Resources.createResource(8 * GB, 4)); + + RMNode node = + resourceManager.getRMContext().getRMNodes().get(nm.getNodeId()); + // Send a heartbeat to kick the tires on the Scheduler + NodeUpdateSchedulerEvent nodeUpdate = new NodeUpdateSchedulerEvent(node); + resourceManager.getResourceScheduler().handle(nodeUpdate); + + // Force remove the node to simulate race condition + ((FifoScheduler) resourceManager.getResourceScheduler()) + .getNodeTracker().removeNode(nm.getNodeId()); + // Kick off another heartbeat with the node state mocked to decommissioning + RMNode spyNode = + Mockito.spy(resourceManager.getRMContext().getRMNodes() + .get(nm.getNodeId())); + when(spyNode.getState()).thenReturn(NodeState.DECOMMISSIONING); + resourceManager.getResourceScheduler().handle( + new NodeUpdateSchedulerEvent(spyNode)); + } + @Test public void testResourceUpdateDecommissioningNode() throws Exception { // Mock the RMNodeResourceUpdate event handler to update SchedulerNode @@ -1215,9 +1235,8 @@ public class TestFifoScheduler { ((AsyncDispatcher) mockDispatcher).start(); // Register node String host_0 = "host_0"; - org.apache.hadoop.yarn.server.resourcemanager.NodeManager nm_0 = - registerNode(host_0, 1234, 2345, NetworkTopology.DEFAULT_RACK, - Resources.createResource(8 * GB, 4)); + NodeManager nm_0 = registerNode(host_0, 1234, 2345, + NetworkTopology.DEFAULT_RACK, Resources.createResource(8 * GB, 4)); // ResourceRequest priorities Priority priority_0 = Priority.newInstance(0); @@ -1283,7 +1302,7 @@ public class TestFifoScheduler { } private void checkNodeResourceUsage(int expected, - org.apache.hadoop.yarn.server.resourcemanager.NodeManager node) { + NodeManager node) { Assert.assertEquals(expected, node.getUsed().getMemorySize()); node.checkResourceUsage(); }