From 93d6ed859e92dc132df2b3ae92fabbe5f958004d Mon Sep 17 00:00:00 2001 From: Robert Kanter Date: Mon, 4 Jun 2018 15:32:03 -0700 Subject: [PATCH] YARN-4677. RMNodeResourceUpdateEvent update from scheduler can lead to race condition (wilfreds and gphillips via rkanter) (cherry picked from commit 0cd145a44390bc1a01113dce4be4e629637c3e8a) --- .../scheduler/AbstractYarnScheduler.java | 19 +++++--- .../scheduler/fair/FairScheduler.java | 2 +- .../scheduler/fifo/FifoScheduler.java | 6 ++- .../capacity/TestCapacityScheduler.java | 44 ++++++++++++++----- .../scheduler/fair/TestFairScheduler.java | 43 +++++++++++++----- .../scheduler/fifo/TestFifoScheduler.java | 44 ++++++++++++++----- 6 files changed, 115 insertions(+), 43 deletions(-) diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AbstractYarnScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AbstractYarnScheduler.java index 18c7b4eb7d9..d2e81a50d94 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AbstractYarnScheduler.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AbstractYarnScheduler.java @@ -1106,12 +1106,16 @@ public abstract class AbstractYarnScheduler } // Process new container information + // NOTICE: it is possible to not find the NodeID as a node can be + // decommissioned at the same time. Skip updates if node is null. SchedulerNode schedulerNode = getNode(nm.getNodeID()); List completedContainers = updateNewContainerInfo(nm, schedulerNode); // Notify Scheduler Node updated. - schedulerNode.notifyNodeUpdate(); + if (schedulerNode != null) { + schedulerNode.notifyNodeUpdate(); + } // Process completed containers Resource releasedResources = Resource.newInstance(0, 0); @@ -1121,9 +1125,7 @@ public abstract class AbstractYarnScheduler // If the node is decommissioning, send an update to have the total // resource equal to the used resource, so no available resource to // schedule. - // TODO YARN-5128: Fix possible race-condition when request comes in before - // update is propagated - if (nm.getState() == NodeState.DECOMMISSIONING) { + if (nm.getState() == NodeState.DECOMMISSIONING && schedulerNode != null) { this.rmContext .getDispatcher() .getEventHandler() @@ -1133,13 +1135,16 @@ public abstract class AbstractYarnScheduler } updateSchedulerHealthInformation(releasedResources, releasedContainers); - updateNodeResourceUtilization(nm, schedulerNode); + if (schedulerNode != null) { + updateNodeResourceUtilization(nm, schedulerNode); + } // Now node data structures are up-to-date and ready for scheduling. if(LOG.isDebugEnabled()) { LOG.debug( - "Node being looked for scheduling " + nm + " availableResource: " - + schedulerNode.getUnallocatedResource()); + "Node being looked for scheduling " + nm + " availableResource: " + + (schedulerNode == null ? "unknown (decommissioned)" : + schedulerNode.getUnallocatedResource())); } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java index 1f85814adac..13874bfae2b 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java @@ -1030,7 +1030,7 @@ public class FairScheduler extends return; } - final NodeId nodeID = node.getNodeID(); + final NodeId nodeID = (node != null ? node.getNodeID() : null); if (!nodeTracker.exists(nodeID)) { // The node might have just been removed while this thread was waiting // on the synchronized lock before it entered this synchronized method diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/FifoScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/FifoScheduler.java index 7ac9027a78a..8396db54ad8 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/FifoScheduler.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/FifoScheduler.java @@ -966,8 +966,10 @@ public class FifoScheduler extends return; } - if (Resources.greaterThanOrEqual(resourceCalculator, getClusterResource(), - node.getUnallocatedResource(), minimumAllocation)) { + // A decommissioned node might be removed before we get here + if (node != null && + Resources.greaterThanOrEqual(resourceCalculator, getClusterResource(), + node.getUnallocatedResource(), minimumAllocation)) { LOG.debug("Node heartbeat " + nm.getNodeID() + " available resource = " + node.getUnallocatedResource()); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacityScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacityScheduler.java index 1d2aadcf2a8..0b54010c276 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacityScheduler.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacityScheduler.java @@ -258,14 +258,12 @@ public class TestCapacityScheduler extends CapacitySchedulerTestBase { } } - private NodeManager - registerNode(String hostName, int containerManagerPort, int httpPort, - String rackName, Resource capability) + private NodeManager registerNode(String hostName, int containerManagerPort, + int httpPort, String rackName, + Resource capability) throws IOException, YarnException { - NodeManager nm = - new NodeManager( - hostName, containerManagerPort, httpPort, rackName, capability, - resourceManager); + NodeManager nm = new NodeManager(hostName, containerManagerPort, httpPort, + rackName, capability, resourceManager); NodeAddedSchedulerEvent nodeAddEvent1 = new NodeAddedSchedulerEvent(resourceManager.getRMContext() .getRMNodes().get(nm.getNodeId())); @@ -280,13 +278,13 @@ public class TestCapacityScheduler extends CapacitySchedulerTestBase { // Register node1 String host_0 = "host_0"; - org.apache.hadoop.yarn.server.resourcemanager.NodeManager nm_0 = + NodeManager nm_0 = registerNode(host_0, 1234, 2345, NetworkTopology.DEFAULT_RACK, Resources.createResource(4 * GB, 1)); // Register node2 String host_1 = "host_1"; - org.apache.hadoop.yarn.server.resourcemanager.NodeManager nm_1 = + NodeManager nm_1 = registerNode(host_1, 1234, 2345, NetworkTopology.DEFAULT_RACK, Resources.createResource(2 * GB, 1)); @@ -4038,6 +4036,29 @@ public class TestCapacityScheduler extends CapacitySchedulerTestBase { Assert.fail("Cannot find RMContainer"); } } + @Test + public void testRemovedNodeDecomissioningNode() throws Exception { + // Register nodemanager + NodeManager nm = registerNode("host_decom", 1234, 2345, + NetworkTopology.DEFAULT_RACK, Resources.createResource(8 * GB, 4)); + + RMNode node = + resourceManager.getRMContext().getRMNodes().get(nm.getNodeId()); + // Send a heartbeat to kick the tires on the Scheduler + NodeUpdateSchedulerEvent nodeUpdate = new NodeUpdateSchedulerEvent(node); + resourceManager.getResourceScheduler().handle(nodeUpdate); + + // force remove the node to simulate race condition + ((CapacityScheduler) resourceManager.getResourceScheduler()).getNodeTracker(). + removeNode(nm.getNodeId()); + // Kick off another heartbeat with the node state mocked to decommissioning + RMNode spyNode = + Mockito.spy(resourceManager.getRMContext().getRMNodes() + .get(nm.getNodeId())); + when(spyNode.getState()).thenReturn(NodeState.DECOMMISSIONING); + resourceManager.getResourceScheduler().handle( + new NodeUpdateSchedulerEvent(spyNode)); + } @Test public void testResourceUpdateDecommissioningNode() throws Exception { @@ -4064,9 +4085,8 @@ public class TestCapacityScheduler extends CapacitySchedulerTestBase { ((AsyncDispatcher) mockDispatcher).start(); // Register node String host_0 = "host_0"; - org.apache.hadoop.yarn.server.resourcemanager.NodeManager nm_0 = - registerNode(host_0, 1234, 2345, NetworkTopology.DEFAULT_RACK, - Resources.createResource(8 * GB, 4)); + NodeManager nm_0 = registerNode(host_0, 1234, 2345, + NetworkTopology.DEFAULT_RACK, Resources.createResource(8 * GB, 4)); // ResourceRequest priorities Priority priority_0 = Priority.newInstance(0); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairScheduler.java index d9c06a79db2..3a8e929b6d7 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairScheduler.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairScheduler.java @@ -77,6 +77,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.MockAM; import org.apache.hadoop.yarn.server.resourcemanager.MockNM; import org.apache.hadoop.yarn.server.resourcemanager.MockNodes; import org.apache.hadoop.yarn.server.resourcemanager.MockRM; +import org.apache.hadoop.yarn.server.resourcemanager.NodeManager; import org.apache.hadoop.yarn.server.resourcemanager.RMContext; import org.apache.hadoop.yarn.server.resourcemanager.RMContextImpl; import org.apache.hadoop.yarn.server.resourcemanager.recovery.MemoryRMStateStore; @@ -4968,6 +4969,30 @@ public class TestFairScheduler extends FairSchedulerTestBase { .get(attId3.getApplicationId()).getQueue()); } + @Test + public void testRemovedNodeDecomissioningNode() throws Exception { + // Register nodemanager + NodeManager nm = registerNode("host_decom", 1234, 2345, + NetworkTopology.DEFAULT_RACK, Resources.createResource(8 * GB, 4)); + + RMNode node = + resourceManager.getRMContext().getRMNodes().get(nm.getNodeId()); + // Send a heartbeat to kick the tires on the Scheduler + NodeUpdateSchedulerEvent nodeUpdate = new NodeUpdateSchedulerEvent(node); + resourceManager.getResourceScheduler().handle(nodeUpdate); + + // Force remove the node to simulate race condition + ((FairScheduler) resourceManager.getResourceScheduler()) + .getNodeTracker().removeNode(nm.getNodeId()); + // Kick off another heartbeat with the node state mocked to decommissioning + RMNode spyNode = + Mockito.spy(resourceManager.getRMContext().getRMNodes() + .get(nm.getNodeId())); + when(spyNode.getState()).thenReturn(NodeState.DECOMMISSIONING); + resourceManager.getResourceScheduler().handle( + new NodeUpdateSchedulerEvent(spyNode)); + } + @Test public void testResourceUpdateDecommissioningNode() throws Exception { // Mock the RMNodeResourceUpdate event handler to update SchedulerNode @@ -4993,9 +5018,8 @@ public class TestFairScheduler extends FairSchedulerTestBase { ((AsyncDispatcher) mockDispatcher).start(); // Register node String host_0 = "host_0"; - org.apache.hadoop.yarn.server.resourcemanager.NodeManager nm_0 = - registerNode(host_0, 1234, 2345, NetworkTopology.DEFAULT_RACK, - Resources.createResource(8 * GB, 4)); + NodeManager nm_0 = registerNode(host_0, 1234, 2345, + NetworkTopology.DEFAULT_RACK, Resources.createResource(8 * GB, 4)); RMNode node = resourceManager.getRMContext().getRMNodes().get(nm_0.getNodeId()); @@ -5033,13 +5057,12 @@ public class TestFairScheduler extends FairSchedulerTestBase { Assert.assertEquals(availableResource.getVirtualCores(), 0); } - private org.apache.hadoop.yarn.server.resourcemanager.NodeManager registerNode( - String hostName, int containerManagerPort, int httpPort, String rackName, - Resource capability) throws IOException, YarnException { - org.apache.hadoop.yarn.server.resourcemanager.NodeManager nm = - new org.apache.hadoop.yarn.server.resourcemanager.NodeManager(hostName, - containerManagerPort, httpPort, rackName, capability, - resourceManager); + private NodeManager registerNode(String hostName, int containerManagerPort, + int httpPort, String rackName, + Resource capability) + throws IOException, YarnException { + NodeManager nm = new NodeManager(hostName, containerManagerPort, httpPort, + rackName, capability, resourceManager); // after YARN-5375, scheduler event is processed in rm main dispatcher, // wait it processed, or may lead dead lock diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/TestFifoScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/TestFifoScheduler.java index 8814c0e542d..ee66a49032f 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/TestFifoScheduler.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/TestFifoScheduler.java @@ -66,6 +66,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.MockAM; import org.apache.hadoop.yarn.server.resourcemanager.MockNM; import org.apache.hadoop.yarn.server.resourcemanager.MockNodes; import org.apache.hadoop.yarn.server.resourcemanager.MockRM; +import org.apache.hadoop.yarn.server.resourcemanager.NodeManager; import org.apache.hadoop.yarn.server.resourcemanager.RMContext; import org.apache.hadoop.yarn.server.resourcemanager.RMContextImpl; import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager; @@ -138,14 +139,12 @@ public class TestFifoScheduler { resourceManager.stop(); } - private org.apache.hadoop.yarn.server.resourcemanager.NodeManager - registerNode(String hostName, int containerManagerPort, int nmHttpPort, - String rackName, Resource capability) throws IOException, - YarnException { - org.apache.hadoop.yarn.server.resourcemanager.NodeManager nm = - new org.apache.hadoop.yarn.server.resourcemanager.NodeManager(hostName, - containerManagerPort, nmHttpPort, rackName, capability, - resourceManager); + private NodeManager registerNode(String hostName, int containerManagerPort, + int nmHttpPort, String rackName, + Resource capability) + throws IOException, YarnException { + NodeManager nm = new NodeManager(hostName, containerManagerPort, + nmHttpPort, rackName, capability, resourceManager); NodeAddedSchedulerEvent nodeAddEvent1 = new NodeAddedSchedulerEvent(resourceManager.getRMContext().getRMNodes() .get(nm.getNodeId())); @@ -1195,6 +1194,30 @@ public class TestFifoScheduler { rm.stop(); } + @Test + public void testRemovedNodeDecomissioningNode() throws Exception { + // Register nodemanager + NodeManager nm = registerNode("host_decom", 1234, 2345, + NetworkTopology.DEFAULT_RACK, Resources.createResource(8 * GB, 4)); + + RMNode node = + resourceManager.getRMContext().getRMNodes().get(nm.getNodeId()); + // Send a heartbeat to kick the tires on the Scheduler + NodeUpdateSchedulerEvent nodeUpdate = new NodeUpdateSchedulerEvent(node); + resourceManager.getResourceScheduler().handle(nodeUpdate); + + // Force remove the node to simulate race condition + ((FifoScheduler) resourceManager.getResourceScheduler()) + .getNodeTracker().removeNode(nm.getNodeId()); + // Kick off another heartbeat with the node state mocked to decommissioning + RMNode spyNode = + Mockito.spy(resourceManager.getRMContext().getRMNodes() + .get(nm.getNodeId())); + when(spyNode.getState()).thenReturn(NodeState.DECOMMISSIONING); + resourceManager.getResourceScheduler().handle( + new NodeUpdateSchedulerEvent(spyNode)); + } + @Test public void testResourceUpdateDecommissioningNode() throws Exception { // Mock the RMNodeResourceUpdate event handler to update SchedulerNode @@ -1220,9 +1243,8 @@ public class TestFifoScheduler { ((AsyncDispatcher) mockDispatcher).start(); // Register node String host_0 = "host_0"; - org.apache.hadoop.yarn.server.resourcemanager.NodeManager nm_0 = - registerNode(host_0, 1234, 2345, NetworkTopology.DEFAULT_RACK, - Resources.createResource(8 * GB, 4)); + NodeManager nm_0 = registerNode(host_0, 1234, 2345, + NetworkTopology.DEFAULT_RACK, Resources.createResource(8 * GB, 4)); // ResourceRequest priorities Priority priority_0 = Priority.newInstance(0);