diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeImpl.java index 0594207b933..d296854987b 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeImpl.java @@ -1302,6 +1302,13 @@ public class RMNodeImpl implements RMNode, EventHandler { .handle( new NodeResourceUpdateSchedulerEvent(rmNode, ResourceOption .newInstance(rmNode.totalCapability, 0))); + + // Notify NodesListManager to notify all RMApp that this node has been + // recommissioned so that each Application Master can take any required + // actions. + rmNode.context.getDispatcher().getEventHandler().handle( + new NodesListManagerEvent( + NodesListManagerEventType.NODE_USABLE, rmNode)); } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/TestAMRMRPCNodeUpdates.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/TestAMRMRPCNodeUpdates.java index dd9bcd08b49..c846da10518 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/TestAMRMRPCNodeUpdates.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/TestAMRMRPCNodeUpdates.java @@ -39,6 +39,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.MockNM; import org.apache.hadoop.yarn.server.resourcemanager.MockRM; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt; +import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeEventType; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerConfiguration; import org.junit.After; import org.junit.Before; @@ -90,6 +91,12 @@ public class TestAMRMRPCNodeUpdates { rm.drainEvents(); } + private void syncNodeRecommissioning(MockNM nm) throws Exception { + rm.sendNodeEvent(nm, RMNodeEventType.RECOMMISSION); + rm.waitForState(nm.getNodeId(), NodeState.RUNNING); + rm.drainEvents(); + } + private AllocateResponse allocate(final ApplicationAttemptId attemptId, final AllocateRequest req) throws Exception { UserGroupInformation ugi = @@ -139,6 +146,53 @@ public class TestAMRMRPCNodeUpdates { NodeUpdateType.NODE_DECOMMISSIONING, nr.getNodeUpdateType()); } + @Test + public void testAMRMRecommissioningNodes() throws Exception { + MockNM nm1 = rm.registerNode("127.0.0.1:1234", 10000); + MockNM nm2 = rm.registerNode("127.0.0.2:1234", 10000); + rm.drainEvents(); + + RMApp app1 = rm.submitApp(2000); + + // Trigger the scheduling so the AM gets 'launched' on nm1 + nm1.nodeHeartbeat(true); + + RMAppAttempt attempt1 = app1.getCurrentAppAttempt(); + MockAM am1 = rm.sendAMLaunched(attempt1.getAppAttemptId()); + + // register AM returns no unusable node + am1.registerAppAttempt(); + + // DECOMMISSION nm2 + Integer decommissioningTimeout = 600; + syncNodeGracefulDecommission(nm2, decommissioningTimeout); + + AllocateRequest allocateRequest1 = + AllocateRequest.newInstance(0, 0F, null, null, null); + AllocateResponse response1 = + allocate(attempt1.getAppAttemptId(), allocateRequest1); + List updatedNodes = response1.getUpdatedNodes(); + Assert.assertEquals(1, updatedNodes.size()); + NodeReport nr = updatedNodes.iterator().next(); + Assert.assertEquals( + decommissioningTimeout, nr.getDecommissioningTimeout()); + Assert.assertEquals( + NodeUpdateType.NODE_DECOMMISSIONING, nr.getNodeUpdateType()); + + // Wait for nm2 to RECOMMISSION + syncNodeRecommissioning(nm2); + + AllocateRequest allocateRequest2 = AllocateRequest + .newInstance(response1.getResponseId(), 0F, null, null, null); + AllocateResponse response2 = + allocate(attempt1.getAppAttemptId(), allocateRequest2); + List updatedNodes2 = response2.getUpdatedNodes(); + Assert.assertEquals(1, updatedNodes2.size()); + NodeReport nr2 = updatedNodes2.iterator().next(); + Assert.assertEquals( + NodeUpdateType.NODE_USABLE, nr2.getNodeUpdateType()); + } + @Test public void testAMRMUnusableNodes() throws Exception {