diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/OpportunisticContainerAllocatorAMService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/OpportunisticContainerAllocatorAMService.java index 1f83127c183..ada1a637fcd 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/OpportunisticContainerAllocatorAMService.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/OpportunisticContainerAllocatorAMService.java @@ -57,6 +57,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerEven import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerImpl; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.AbstractYarnScheduler; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplicationAttempt; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNode; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.YarnScheduler; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.distributed.NodeQueueLoadMonitor; @@ -409,15 +410,19 @@ public class OpportunisticContainerAllocatorAMService private List convertToRemoteNodes(List nodeIds) { ArrayList retNodes = new ArrayList<>(); for (NodeId nId : nodeIds) { - retNodes.add(convertToRemoteNode(nId)); + RemoteNode remoteNode = convertToRemoteNode(nId); + if (null != remoteNode) { + retNodes.add(remoteNode); + } } return retNodes; } private RemoteNode convertToRemoteNode(NodeId nodeId) { - return RemoteNode.newInstance(nodeId, - ((AbstractYarnScheduler)rmContext.getScheduler()).getNode(nodeId) - .getHttpAddress()); + SchedulerNode node = + ((AbstractYarnScheduler) rmContext.getScheduler()).getNode(nodeId); + return node != null ? RemoteNode.newInstance(nodeId, node.getHttpAddress()) + : null; } private Resource createMaxContainerResource() { diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/distributed/NodeQueueLoadMonitor.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/distributed/NodeQueueLoadMonitor.java index 232b4ad711a..dec55ca955f 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/distributed/NodeQueueLoadMonitor.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/distributed/NodeQueueLoadMonitor.java @@ -165,9 +165,11 @@ public class NodeQueueLoadMonitor implements ClusterMonitor { } @Override - public void addNode(List containerStatuses, RMNode - rmNode) { - LOG.debug("Node added event from: " + rmNode.getNode().getName()); + public void addNode(List containerStatuses, + RMNode rmNode) { + if (LOG.isDebugEnabled()) { + LOG.debug("Node added event from: " + rmNode.getNode().getName()); + } // Ignoring this currently : at least one NODE_UPDATE heartbeat is // required to ensure node eligibility. } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestOpportunisticContainerAllocatorAMService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestOpportunisticContainerAllocatorAMService.java index deaee3f1669..4ed92f8a9d1 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestOpportunisticContainerAllocatorAMService.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestOpportunisticContainerAllocatorAMService.java @@ -61,11 +61,23 @@ import org.apache.hadoop.yarn.server.api.protocolrecords.RemoteNode; import org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb.DistributedSchedulingAllocateRequestPBImpl; import org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb.DistributedSchedulingAllocateResponsePBImpl; import org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb.RegisterDistributedSchedulingAMResponsePBImpl; +import org.apache.hadoop.yarn.server.api.records.OpportunisticContainersStatus; +import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.AMLivelinessMonitor; - +import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode; +import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeImpl; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerConfiguration; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeAddedSchedulerEvent; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeRemovedSchedulerEvent; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeUpdateSchedulerEvent; import org.apache.hadoop.yarn.server.resourcemanager.security.RMContainerTokenSecretManager; +import org.apache.hadoop.yarn.server.scheduler.OpportunisticContainerContext; +import org.apache.hadoop.yarn.util.resource.Resources; import org.junit.Assert; import org.junit.Test; +import org.mockito.Mockito; import java.io.IOException; import java.net.InetSocketAddress; @@ -77,6 +89,89 @@ import java.util.List; */ public class TestOpportunisticContainerAllocatorAMService { + private static final int GB = 1024; + + @Test(timeout = 60000) + public void testNodeRemovalDuringAllocate() throws Exception { + CapacitySchedulerConfiguration csConf = + new CapacitySchedulerConfiguration(); + YarnConfiguration conf = new YarnConfiguration(csConf); + conf.setClass(YarnConfiguration.RM_SCHEDULER, CapacityScheduler.class, + ResourceScheduler.class); + conf.setBoolean( + YarnConfiguration.OPPORTUNISTIC_CONTAINER_ALLOCATION_ENABLED, true); + conf.setInt( + YarnConfiguration.NM_CONTAINER_QUEUING_SORTING_NODES_INTERVAL_MS, 100); + MockRM rm = new MockRM(conf); + rm.start(); + MockNM nm1 = new MockNM("h1:1234", 4096, rm.getResourceTrackerService()); + MockNM nm2 = new MockNM("h2:1234", 4096, rm.getResourceTrackerService()); + nm1.registerNode(); + nm2.registerNode(); + OpportunisticContainerAllocatorAMService amservice = + (OpportunisticContainerAllocatorAMService) rm + .getApplicationMasterService(); + RMApp app1 = rm.submitApp(1 * GB, "app", "user", null, "default"); + ApplicationAttemptId attemptId = + app1.getCurrentAppAttempt().getAppAttemptId(); + MockAM am1 = MockRM.launchAndRegisterAM(app1, rm, nm2); + ResourceScheduler scheduler = rm.getResourceScheduler(); + RMNode rmNode1 = rm.getRMContext().getRMNodes().get(nm1.getNodeId()); + RMNode rmNode2 = rm.getRMContext().getRMNodes().get(nm2.getNodeId()); + nm1.nodeHeartbeat(true); + nm2.nodeHeartbeat(true); + ((RMNodeImpl) rmNode1) + .setOpportunisticContainersStatus(getOppurtunisticStatus(-1, 100)); + ((RMNodeImpl) rmNode2) + .setOpportunisticContainersStatus(getOppurtunisticStatus(-1, 100)); + OpportunisticContainerContext ctxt = ((CapacityScheduler) scheduler) + .getApplicationAttempt(attemptId).getOpportunisticContainerContext(); + // Send add and update node events to AM Service. + amservice.handle(new NodeAddedSchedulerEvent(rmNode1)); + amservice.handle(new NodeAddedSchedulerEvent(rmNode2)); + amservice.handle(new NodeUpdateSchedulerEvent(rmNode1)); + amservice.handle(new NodeUpdateSchedulerEvent(rmNode2)); + // Both node 1 and node 2 will be applicable for scheduling. + for (int i = 0; i < 10; i++) { + am1.allocate( + Arrays.asList(ResourceRequest.newInstance(Priority.newInstance(1), + "*", Resources.createResource(1 * GB), 2)), + null); + if (ctxt.getNodeMap().size() == 2) { + break; + } + Thread.sleep(50); + } + Assert.assertEquals(2, ctxt.getNodeMap().size()); + // Remove node from scheduler but not from AM Service. + scheduler.handle(new NodeRemovedSchedulerEvent(rmNode1)); + // After removal of node 1, only 1 node will be applicable for scheduling. + for (int i = 0; i < 10; i++) { + try { + am1.allocate( + Arrays.asList(ResourceRequest.newInstance(Priority.newInstance(1), + "*", Resources.createResource(1 * GB), 2)), + null); + } catch (Exception e) { + Assert.fail("Allocate request should be handled on node removal"); + } + if (ctxt.getNodeMap().size() == 1) { + break; + } + Thread.sleep(50); + } + Assert.assertEquals(1, ctxt.getNodeMap().size()); + } + + private OpportunisticContainersStatus getOppurtunisticStatus(int waitTime, + int queueLength) { + OpportunisticContainersStatus status1 = + Mockito.mock(OpportunisticContainersStatus.class); + Mockito.when(status1.getEstimatedQueueWaitTime()).thenReturn(waitTime); + Mockito.when(status1.getWaitQueueLength()).thenReturn(queueLength); + return status1; + } + // Test if the OpportunisticContainerAllocatorAMService can handle both // DSProtocol as well as AMProtocol clients @Test