YARN-4677. RMNodeResourceUpdateEvent update from scheduler can lead to race condition (wilfreds and gphillips via rkanter)

This commit is contained in:
Robert Kanter 2018-06-04 15:59:27 -07:00
parent 0caf40efe5
commit f97bd6bb7f
6 changed files with 122 additions and 55 deletions

View File

@ -1087,6 +1087,10 @@ public abstract class AbstractYarnScheduler
// Process new container information // Process new container information
List<ContainerStatus> completedContainers = updateNewContainerInfo(nm); List<ContainerStatus> completedContainers = updateNewContainerInfo(nm);
// NOTICE: it is possible to not find the NodeID as a node can be
// decommissioned at the same time. Skip updates if node is null.
SchedulerNode schedulerNode = getNode(nm.getNodeID());
// Process completed containers // Process completed containers
Resource releasedResources = Resource.newInstance(0, 0); Resource releasedResources = Resource.newInstance(0, 0);
int releasedContainers = updateCompletedContainers(completedContainers, int releasedContainers = updateCompletedContainers(completedContainers,
@ -1095,26 +1099,26 @@ public abstract class AbstractYarnScheduler
// If the node is decommissioning, send an update to have the total // If the node is decommissioning, send an update to have the total
// resource equal to the used resource, so no available resource to // resource equal to the used resource, so no available resource to
// schedule. // schedule.
// TODO YARN-5128: Fix possible race-condition when request comes in before if (nm.getState() == NodeState.DECOMMISSIONING && schedulerNode != null) {
// update is propagated
if (nm.getState() == NodeState.DECOMMISSIONING) {
this.rmContext this.rmContext
.getDispatcher() .getDispatcher()
.getEventHandler() .getEventHandler()
.handle( .handle(
new RMNodeResourceUpdateEvent(nm.getNodeID(), ResourceOption new RMNodeResourceUpdateEvent(nm.getNodeID(), ResourceOption
.newInstance(getSchedulerNode(nm.getNodeID()) .newInstance(schedulerNode.getAllocatedResource(), 0)));
.getAllocatedResource(), 0)));
} }
updateSchedulerHealthInformation(releasedResources, releasedContainers); updateSchedulerHealthInformation(releasedResources, releasedContainers);
if (schedulerNode != null) {
updateNodeResourceUtilization(nm); updateNodeResourceUtilization(nm);
}
// Now node data structures are up-to-date and ready for scheduling. // Now node data structures are up-to-date and ready for scheduling.
if(LOG.isDebugEnabled()) { if(LOG.isDebugEnabled()) {
SchedulerNode node = getNode(nm.getNodeID()); LOG.debug(
LOG.debug("Node being looked for scheduling " + nm + "Node being looked for scheduling " + nm + " availableResource: " +
" availableResource: " + node.getUnallocatedResource()); (schedulerNode == null ? "unknown (decomissioned)" :
schedulerNode.getUnallocatedResource()));
} }
} }

View File

@ -1009,7 +1009,7 @@ public class FairScheduler extends
return; return;
} }
final NodeId nodeID = node.getNodeID(); final NodeId nodeID = (node != null ? node.getNodeID() : null);
if (!nodeTracker.exists(nodeID)) { if (!nodeTracker.exists(nodeID)) {
// The node might have just been removed while this thread was waiting // The node might have just been removed while this thread was waiting
// on the synchronized lock before it entered this synchronized method // on the synchronized lock before it entered this synchronized method

View File

@ -970,7 +970,9 @@ public class FifoScheduler extends
return; return;
} }
if (Resources.greaterThanOrEqual(resourceCalculator, getClusterResource(), // A decommissioned node might be removed before we get here
if (node != null &&
Resources.greaterThanOrEqual(resourceCalculator, getClusterResource(),
node.getUnallocatedResource(), minimumAllocation)) { node.getUnallocatedResource(), minimumAllocation)) {
LOG.debug("Node heartbeat " + nm.getNodeID() + LOG.debug("Node heartbeat " + nm.getNodeID() +
" available resource = " + node.getUnallocatedResource()); " available resource = " + node.getUnallocatedResource());

View File

@ -280,14 +280,11 @@ public class TestCapacityScheduler {
} }
} }
private NodeManager private NodeManager registerNode(String hostName, int containerManagerPort,
registerNode(String hostName, int containerManagerPort, int httpPort, int httpPort, String rackName, Resource capability)
String rackName, Resource capability)
throws IOException, YarnException { throws IOException, YarnException {
NodeManager nm = NodeManager nm = new NodeManager(hostName, containerManagerPort, httpPort,
new NodeManager( rackName, capability, resourceManager);
hostName, containerManagerPort, httpPort, rackName, capability,
resourceManager);
NodeAddedSchedulerEvent nodeAddEvent1 = NodeAddedSchedulerEvent nodeAddEvent1 =
new NodeAddedSchedulerEvent(resourceManager.getRMContext() new NodeAddedSchedulerEvent(resourceManager.getRMContext()
.getRMNodes().get(nm.getNodeId())); .getRMNodes().get(nm.getNodeId()));
@ -302,15 +299,13 @@ public class TestCapacityScheduler {
// Register node1 // Register node1
String host_0 = "host_0"; String host_0 = "host_0";
org.apache.hadoop.yarn.server.resourcemanager.NodeManager nm_0 = NodeManager nm_0 = registerNode(host_0, 1234, 2345,
registerNode(host_0, 1234, 2345, NetworkTopology.DEFAULT_RACK, NetworkTopology.DEFAULT_RACK, Resources.createResource(4 * GB, 1));
Resources.createResource(4 * GB, 1));
// Register node2 // Register node2
String host_1 = "host_1"; String host_1 = "host_1";
org.apache.hadoop.yarn.server.resourcemanager.NodeManager nm_1 = NodeManager nm_1 = registerNode(host_1, 1234, 2345,
registerNode(host_1, 1234, 2345, NetworkTopology.DEFAULT_RACK, NetworkTopology.DEFAULT_RACK, Resources.createResource(2 * GB, 1));
Resources.createResource(2 * GB, 1));
// ResourceRequest priorities // ResourceRequest priorities
Priority priority_0 = Priority.newInstance(0); Priority priority_0 = Priority.newInstance(0);
@ -4084,6 +4079,31 @@ public class TestCapacityScheduler {
} }
} }
@Test
public void testRemovedNodeDecomissioningNode() throws Exception {
// Register nodemanager
NodeManager nm = registerNode("host_decom", 1234, 2345,
NetworkTopology.DEFAULT_RACK, Resources.createResource(8 * GB, 4));
RMNode node =
resourceManager.getRMContext().getRMNodes().get(nm.getNodeId());
// Send a heartbeat to kick the tires on the Scheduler
NodeUpdateSchedulerEvent nodeUpdate = new NodeUpdateSchedulerEvent(node);
resourceManager.getResourceScheduler().handle(nodeUpdate);
// force remove the node to simulate race condition
((CapacityScheduler) resourceManager.getResourceScheduler()).
getNodeTracker().removeNode(nm.getNodeId());
// Kick off another heartbeat with the node state mocked to decommissioning
RMNode spyNode =
Mockito.spy(resourceManager.getRMContext().getRMNodes()
.get(nm.getNodeId()));
when(spyNode.getState()).thenReturn(NodeState.DECOMMISSIONING);
resourceManager.getResourceScheduler().handle(
new NodeUpdateSchedulerEvent(spyNode));
}
@Test @Test
public void testResourceUpdateDecommissioningNode() throws Exception { public void testResourceUpdateDecommissioningNode() throws Exception {
// Mock the RMNodeResourceUpdate event handler to update SchedulerNode // Mock the RMNodeResourceUpdate event handler to update SchedulerNode
@ -4109,9 +4129,8 @@ public class TestCapacityScheduler {
((AsyncDispatcher) mockDispatcher).start(); ((AsyncDispatcher) mockDispatcher).start();
// Register node // Register node
String host_0 = "host_0"; String host_0 = "host_0";
org.apache.hadoop.yarn.server.resourcemanager.NodeManager nm_0 = NodeManager nm_0 = registerNode(host_0, 1234, 2345,
registerNode(host_0, 1234, 2345, NetworkTopology.DEFAULT_RACK, NetworkTopology.DEFAULT_RACK, Resources.createResource(8 * GB, 4));
Resources.createResource(8 * GB, 4));
// ResourceRequest priorities // ResourceRequest priorities
Priority priority_0 = Priority.newInstance(0); Priority priority_0 = Priority.newInstance(0);

View File

@ -77,6 +77,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.MockAM;
import org.apache.hadoop.yarn.server.resourcemanager.MockNM; import org.apache.hadoop.yarn.server.resourcemanager.MockNM;
import org.apache.hadoop.yarn.server.resourcemanager.MockNodes; import org.apache.hadoop.yarn.server.resourcemanager.MockNodes;
import org.apache.hadoop.yarn.server.resourcemanager.MockRM; import org.apache.hadoop.yarn.server.resourcemanager.MockRM;
import org.apache.hadoop.yarn.server.resourcemanager.NodeManager;
import org.apache.hadoop.yarn.server.resourcemanager.RMContext; import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
import org.apache.hadoop.yarn.server.resourcemanager.RMContextImpl; import org.apache.hadoop.yarn.server.resourcemanager.RMContextImpl;
import org.apache.hadoop.yarn.server.resourcemanager.recovery.MemoryRMStateStore; import org.apache.hadoop.yarn.server.resourcemanager.recovery.MemoryRMStateStore;
@ -4967,6 +4968,30 @@ public class TestFairScheduler extends FairSchedulerTestBase {
.get(attId3.getApplicationId()).getQueue()); .get(attId3.getApplicationId()).getQueue());
} }
@Test
public void testRemovedNodeDecomissioningNode() throws Exception {
// Register nodemanager
NodeManager nm = registerNode("host_decom", 1234, 2345,
NetworkTopology.DEFAULT_RACK, Resources.createResource(8 * GB, 4));
RMNode node =
resourceManager.getRMContext().getRMNodes().get(nm.getNodeId());
// Send a heartbeat to kick the tires on the Scheduler
NodeUpdateSchedulerEvent nodeUpdate = new NodeUpdateSchedulerEvent(node);
resourceManager.getResourceScheduler().handle(nodeUpdate);
// Force remove the node to simulate race condition
((FairScheduler) resourceManager.getResourceScheduler())
.getNodeTracker().removeNode(nm.getNodeId());
// Kick off another heartbeat with the node state mocked to decommissioning
RMNode spyNode =
Mockito.spy(resourceManager.getRMContext().getRMNodes()
.get(nm.getNodeId()));
when(spyNode.getState()).thenReturn(NodeState.DECOMMISSIONING);
resourceManager.getResourceScheduler().handle(
new NodeUpdateSchedulerEvent(spyNode));
}
@Test @Test
public void testResourceUpdateDecommissioningNode() throws Exception { public void testResourceUpdateDecommissioningNode() throws Exception {
// Mock the RMNodeResourceUpdate event handler to update SchedulerNode // Mock the RMNodeResourceUpdate event handler to update SchedulerNode
@ -4992,9 +5017,8 @@ public class TestFairScheduler extends FairSchedulerTestBase {
((AsyncDispatcher) mockDispatcher).start(); ((AsyncDispatcher) mockDispatcher).start();
// Register node // Register node
String host_0 = "host_0"; String host_0 = "host_0";
org.apache.hadoop.yarn.server.resourcemanager.NodeManager nm_0 = NodeManager nm_0 = registerNode(host_0, 1234, 2345,
registerNode(host_0, 1234, 2345, NetworkTopology.DEFAULT_RACK, NetworkTopology.DEFAULT_RACK, Resources.createResource(8 * GB, 4));
Resources.createResource(8 * GB, 4));
RMNode node = RMNode node =
resourceManager.getRMContext().getRMNodes().get(nm_0.getNodeId()); resourceManager.getRMContext().getRMNodes().get(nm_0.getNodeId());
@ -5032,13 +5056,12 @@ public class TestFairScheduler extends FairSchedulerTestBase {
Assert.assertEquals(availableResource.getVirtualCores(), 0); Assert.assertEquals(availableResource.getVirtualCores(), 0);
} }
private org.apache.hadoop.yarn.server.resourcemanager.NodeManager registerNode( private NodeManager registerNode(String hostName, int containerManagerPort,
String hostName, int containerManagerPort, int httpPort, String rackName, int httpPort, String rackName,
Resource capability) throws IOException, YarnException { Resource capability)
org.apache.hadoop.yarn.server.resourcemanager.NodeManager nm = throws IOException, YarnException {
new org.apache.hadoop.yarn.server.resourcemanager.NodeManager(hostName, NodeManager nm = new NodeManager(hostName, containerManagerPort, httpPort,
containerManagerPort, httpPort, rackName, capability, rackName, capability, resourceManager);
resourceManager);
// after YARN-5375, scheduler event is processed in rm main dispatcher, // after YARN-5375, scheduler event is processed in rm main dispatcher,
// wait it processed, or may lead dead lock // wait it processed, or may lead dead lock

View File

@ -66,6 +66,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.MockAM;
import org.apache.hadoop.yarn.server.resourcemanager.MockNM; import org.apache.hadoop.yarn.server.resourcemanager.MockNM;
import org.apache.hadoop.yarn.server.resourcemanager.MockNodes; import org.apache.hadoop.yarn.server.resourcemanager.MockNodes;
import org.apache.hadoop.yarn.server.resourcemanager.MockRM; import org.apache.hadoop.yarn.server.resourcemanager.MockRM;
import org.apache.hadoop.yarn.server.resourcemanager.NodeManager;
import org.apache.hadoop.yarn.server.resourcemanager.RMContext; import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
import org.apache.hadoop.yarn.server.resourcemanager.RMContextImpl; import org.apache.hadoop.yarn.server.resourcemanager.RMContextImpl;
import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager; import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager;
@ -137,14 +138,11 @@ public class TestFifoScheduler {
resourceManager.stop(); resourceManager.stop();
} }
private org.apache.hadoop.yarn.server.resourcemanager.NodeManager private NodeManager registerNode(String hostName, int containerManagerPort,
registerNode(String hostName, int containerManagerPort, int nmHttpPort, int nmHttpPort, String rackName, Resource capability)
String rackName, Resource capability) throws IOException, throws IOException, YarnException {
YarnException { NodeManager nm = new NodeManager(hostName, containerManagerPort, nmHttpPort,
org.apache.hadoop.yarn.server.resourcemanager.NodeManager nm = rackName, capability, resourceManager);
new org.apache.hadoop.yarn.server.resourcemanager.NodeManager(hostName,
containerManagerPort, nmHttpPort, rackName, capability,
resourceManager);
NodeAddedSchedulerEvent nodeAddEvent1 = NodeAddedSchedulerEvent nodeAddEvent1 =
new NodeAddedSchedulerEvent(resourceManager.getRMContext().getRMNodes() new NodeAddedSchedulerEvent(resourceManager.getRMContext().getRMNodes()
.get(nm.getNodeId())); .get(nm.getNodeId()));
@ -402,16 +400,14 @@ public class TestFifoScheduler {
// Register node1 // Register node1
String host_0 = "host_0"; String host_0 = "host_0";
org.apache.hadoop.yarn.server.resourcemanager.NodeManager nm_0 = NodeManager nm_0 = registerNode(host_0, 1234, 2345,
registerNode(host_0, 1234, 2345, NetworkTopology.DEFAULT_RACK, NetworkTopology.DEFAULT_RACK, Resources.createResource(4 * GB, 1));
Resources.createResource(4 * GB, 1));
nm_0.heartbeat(); nm_0.heartbeat();
// Register node2 // Register node2
String host_1 = "host_1"; String host_1 = "host_1";
org.apache.hadoop.yarn.server.resourcemanager.NodeManager nm_1 = NodeManager nm_1 = registerNode(host_1, 1234, 2345,
registerNode(host_1, 1234, 2345, NetworkTopology.DEFAULT_RACK, NetworkTopology.DEFAULT_RACK, Resources.createResource(2 * GB, 1));
Resources.createResource(2 * GB, 1));
nm_1.heartbeat(); nm_1.heartbeat();
// ResourceRequest priorities // ResourceRequest priorities
@ -1190,6 +1186,30 @@ public class TestFifoScheduler {
rm.stop(); rm.stop();
} }
@Test
public void testRemovedNodeDecomissioningNode() throws Exception {
// Register nodemanager
NodeManager nm = registerNode("host_decom", 1234, 2345,
NetworkTopology.DEFAULT_RACK, Resources.createResource(8 * GB, 4));
RMNode node =
resourceManager.getRMContext().getRMNodes().get(nm.getNodeId());
// Send a heartbeat to kick the tires on the Scheduler
NodeUpdateSchedulerEvent nodeUpdate = new NodeUpdateSchedulerEvent(node);
resourceManager.getResourceScheduler().handle(nodeUpdate);
// Force remove the node to simulate race condition
((FifoScheduler) resourceManager.getResourceScheduler())
.getNodeTracker().removeNode(nm.getNodeId());
// Kick off another heartbeat with the node state mocked to decommissioning
RMNode spyNode =
Mockito.spy(resourceManager.getRMContext().getRMNodes()
.get(nm.getNodeId()));
when(spyNode.getState()).thenReturn(NodeState.DECOMMISSIONING);
resourceManager.getResourceScheduler().handle(
new NodeUpdateSchedulerEvent(spyNode));
}
@Test @Test
public void testResourceUpdateDecommissioningNode() throws Exception { public void testResourceUpdateDecommissioningNode() throws Exception {
// Mock the RMNodeResourceUpdate event handler to update SchedulerNode // Mock the RMNodeResourceUpdate event handler to update SchedulerNode
@ -1215,9 +1235,8 @@ public class TestFifoScheduler {
((AsyncDispatcher) mockDispatcher).start(); ((AsyncDispatcher) mockDispatcher).start();
// Register node // Register node
String host_0 = "host_0"; String host_0 = "host_0";
org.apache.hadoop.yarn.server.resourcemanager.NodeManager nm_0 = NodeManager nm_0 = registerNode(host_0, 1234, 2345,
registerNode(host_0, 1234, 2345, NetworkTopology.DEFAULT_RACK, NetworkTopology.DEFAULT_RACK, Resources.createResource(8 * GB, 4));
Resources.createResource(8 * GB, 4));
// ResourceRequest priorities // ResourceRequest priorities
Priority priority_0 = Priority.newInstance(0); Priority priority_0 = Priority.newInstance(0);
@ -1283,7 +1302,7 @@ public class TestFifoScheduler {
} }
private void checkNodeResourceUsage(int expected, private void checkNodeResourceUsage(int expected,
org.apache.hadoop.yarn.server.resourcemanager.NodeManager node) { NodeManager node) {
Assert.assertEquals(expected, node.getUsed().getMemorySize()); Assert.assertEquals(expected, node.getUsed().getMemorySize());
node.checkResourceUsage(); node.checkResourceUsage();
} }