From a337f0e3549351344bce70cb23ddc0a256c894b0 Mon Sep 17 00:00:00 2001 From: Jason Lowe Date: Thu, 18 Sep 2014 21:34:40 +0000 Subject: [PATCH] YARN-2561. MR job client cannot reconnect to AM after NM restart. Contributed by Junping Du --- hadoop-yarn-project/CHANGES.txt | 3 ++ .../resourcemanager/rmnode/RMNodeImpl.java | 47 ++++++++++++++++--- .../TestResourceTrackerService.java | 11 +++++ 3 files changed, 55 insertions(+), 6 deletions(-) diff --git a/hadoop-yarn-project/CHANGES.txt b/hadoop-yarn-project/CHANGES.txt index fbbef7f5b0f..836248bafaf 100644 --- a/hadoop-yarn-project/CHANGES.txt +++ b/hadoop-yarn-project/CHANGES.txt @@ -391,6 +391,9 @@ Release 2.6.0 - UNRELEASED YARN-2363. Submitted applications occasionally lack a tracking URL (jlowe) + YARN-2561. MR job client cannot reconnect to AM after NM restart. (Junping + Du via jlowe) + Release 2.5.1 - 2014-09-05 INCOMPATIBLE CHANGES diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeImpl.java index 3ce641662cc..12659587e5d 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeImpl.java @@ -544,12 +544,47 @@ public class RMNodeImpl implements RMNode, EventHandler { RMNodeReconnectEvent reconnectEvent = (RMNodeReconnectEvent) event; RMNode newNode = reconnectEvent.getReconnectedNode(); rmNode.nodeManagerVersion = newNode.getNodeManagerVersion(); - rmNode.httpPort = newNode.getHttpPort(); - rmNode.httpAddress = newNode.getHttpAddress(); - rmNode.totalCapability = newNode.getTotalCapability(); + List runningApps = reconnectEvent.getRunningApplications(); + boolean noRunningApps = + (runningApps == null) || (runningApps.size() == 0); - // Reset heartbeat ID since node just restarted. - rmNode.getLastNodeHeartBeatResponse().setResponseId(0); + // No application running on the node, so send node-removal event with + // cleaning up old container info. + if (noRunningApps) { + rmNode.nodeUpdateQueue.clear(); + rmNode.context.getDispatcher().getEventHandler().handle( + new NodeRemovedSchedulerEvent(rmNode)); + + if (rmNode.getHttpPort() == newNode.getHttpPort()) { + // Reset heartbeat ID since node just restarted. + rmNode.getLastNodeHeartBeatResponse().setResponseId(0); + if (rmNode.getState() != NodeState.UNHEALTHY) { + // Only add new node if old state is not UNHEALTHY + rmNode.context.getDispatcher().getEventHandler().handle( + new NodeAddedSchedulerEvent(newNode)); + } + } else { + // Reconnected node differs, so replace old node and start new node + switch (rmNode.getState()) { + case RUNNING: + ClusterMetrics.getMetrics().decrNumActiveNodes(); + break; + case UNHEALTHY: + ClusterMetrics.getMetrics().decrNumUnhealthyNMs(); + break; + } + rmNode.context.getRMNodes().put(newNode.getNodeID(), newNode); + rmNode.context.getDispatcher().getEventHandler().handle( + new RMNodeStartedEvent(newNode.getNodeID(), null, null)); + } + } else { + rmNode.httpPort = newNode.getHttpPort(); + rmNode.httpAddress = newNode.getHttpAddress(); + rmNode.totalCapability = newNode.getTotalCapability(); + + // Reset heartbeat ID since node just restarted. + rmNode.getLastNodeHeartBeatResponse().setResponseId(0); + } if (null != reconnectEvent.getRunningApplications()) { for (ApplicationId appId : reconnectEvent.getRunningApplications()) { @@ -564,7 +599,7 @@ public class RMNodeImpl implements RMNode, EventHandler { // Update scheduler node's capacity for reconnect node. rmNode.context.getDispatcher().getEventHandler().handle( new NodeResourceUpdateSchedulerEvent(rmNode, - ResourceOption.newInstance(rmNode.totalCapability, -1))); + ResourceOption.newInstance(newNode.getTotalCapability(), -1))); } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestResourceTrackerService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestResourceTrackerService.java index 48276205bf9..115f0b40a35 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestResourceTrackerService.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestResourceTrackerService.java @@ -26,6 +26,7 @@ import static org.mockito.Mockito.verify; import java.io.File; import java.io.FileOutputStream; import java.io.IOException; +import java.util.ArrayList; import java.util.HashMap; import java.util.List; @@ -599,6 +600,16 @@ public class TestResourceTrackerService { dispatcher.await(); Assert.assertTrue(NodeAction.NORMAL.equals(response.getNodeAction())); Assert.assertEquals(5120 + 10240, metrics.getAvailableMB()); + + // reconnect of node with changed capability and running applications + List runningApps = new ArrayList(); + runningApps.add(ApplicationId.newInstance(1, 0)); + nm1 = rm.registerNode("host2:5678", 15360, 2, runningApps); + dispatcher.await(); + response = nm1.nodeHeartbeat(true); + dispatcher.await(); + Assert.assertTrue(NodeAction.NORMAL.equals(response.getNodeAction())); + Assert.assertEquals(5120 + 15360, metrics.getAvailableMB()); } private void writeToHostsFile(String... hosts) throws IOException {