From db92b09e0396d4f0e5b227c40a1c0a8ca62eb048 Mon Sep 17 00:00:00 2001 From: Jian He Date: Tue, 3 Mar 2015 16:25:57 -0800 Subject: [PATCH] YARN-3222. Fixed NPE on RMNodeImpl#ReconnectNodeTransition when a node is reconnected with a different port. Contributed by Rohith Sharmaks (cherry picked from commit b2f1ec312ee431aef762cfb49cb29cd6f4661e86) (cherry picked from commit 888a44563819ba910dc3cc10d10ee0fb8f05db61) (cherry picked from commit b78f87825bd593e30b2f2ea76f37c7a4fd673ab2) --- .../resourcemanager/rmnode/RMNodeImpl.java | 34 ++++++++++--------- .../yarn/server/resourcemanager/MockNM.java | 6 +++- .../TestResourceTrackerService.java | 17 +++++++++- 3 files changed, 39 insertions(+), 18 deletions(-) diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeImpl.java index 1774eb54e69..b92f399f411 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeImpl.java @@ -564,12 +564,12 @@ public class RMNodeImpl implements RMNode, EventHandler { rmNode.nodeUpdateQueue.clear(); rmNode.context.getDispatcher().getEventHandler().handle( new NodeRemovedSchedulerEvent(rmNode)); - + if (rmNode.getHttpPort() == newNode.getHttpPort()) { // Reset heartbeat ID since node just restarted. rmNode.getLastNodeHeartBeatResponse().setResponseId(0); - if (rmNode.getState() != NodeState.UNHEALTHY) { - // Only add new node if old state is not UNHEALTHY + if (rmNode.getState().equals(NodeState.RUNNING)) { + // Only add new node if old state is RUNNING rmNode.context.getDispatcher().getEventHandler().handle( new NodeAddedSchedulerEvent(newNode)); } @@ -590,28 +590,30 @@ public class RMNodeImpl implements RMNode, EventHandler { } else { rmNode.httpPort = newNode.getHttpPort(); rmNode.httpAddress = newNode.getHttpAddress(); - rmNode.totalCapability = newNode.getTotalCapability(); + boolean isCapabilityChanged = false; + if (rmNode.getTotalCapability() != newNode.getTotalCapability()) { + rmNode.totalCapability = newNode.getTotalCapability(); + isCapabilityChanged = true; + } // Reset heartbeat ID since node just restarted. rmNode.getLastNodeHeartBeatResponse().setResponseId(0); - } - if (null != reconnectEvent.getRunningApplications()) { for (ApplicationId appId : reconnectEvent.getRunningApplications()) { handleRunningAppOnNode(rmNode, rmNode.context, appId, rmNode.nodeId); } - } - rmNode.context.getDispatcher().getEventHandler().handle( - new NodesListManagerEvent( - NodesListManagerEventType.NODE_USABLE, rmNode)); - if (rmNode.getState().equals(NodeState.RUNNING)) { - // Update scheduler node's capacity for reconnect node. - rmNode.context.getDispatcher().getEventHandler().handle( - new NodeResourceUpdateSchedulerEvent(rmNode, - ResourceOption.newInstance(newNode.getTotalCapability(), -1))); + if (isCapabilityChanged + && rmNode.getState().equals(NodeState.RUNNING)) { + // Update scheduler node's capacity for reconnect node. + rmNode.context + .getDispatcher() + .getEventHandler() + .handle( + new NodeResourceUpdateSchedulerEvent(rmNode, ResourceOption + .newInstance(newNode.getTotalCapability(), -1))); + } } - } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockNM.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockNM.java index 5f53805c21a..c917f7976b0 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockNM.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockNM.java @@ -51,7 +51,7 @@ public class MockNM { private final int memory; private final int vCores; private ResourceTrackerService resourceTracker; - private final int httpPort = 2; + private int httpPort = 2; private MasterKey currentContainerTokenMasterKey; private MasterKey currentNMTokenMasterKey; private String version; @@ -87,6 +87,10 @@ public class MockNM { return httpPort; } + public void setHttpPort(int port) { + httpPort = port; + } + public void setResourceTrackerService(ResourceTrackerService resourceTracker) { this.resourceTracker = resourceTracker; } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestResourceTrackerService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestResourceTrackerService.java index 009b9f432b4..7f173edd8ca 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestResourceTrackerService.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestResourceTrackerService.java @@ -56,6 +56,7 @@ import org.apache.hadoop.yarn.server.api.protocolrecords.RegisterNodeManagerResp import org.apache.hadoop.yarn.server.api.records.NodeAction; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptImpl; +import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.QueueMetrics; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.SchedulerEvent; import org.apache.hadoop.yarn.server.utils.BuilderUtils; @@ -621,7 +622,7 @@ public class TestResourceTrackerService { dispatcher.await(); Assert.assertTrue(NodeAction.NORMAL.equals(response.getNodeAction())); Assert.assertEquals(5120 + 10240, metrics.getAvailableMB()); - + // reconnect of node with changed capability and running applications List runningApps = new ArrayList(); runningApps.add(ApplicationId.newInstance(1, 0)); @@ -631,6 +632,20 @@ public class TestResourceTrackerService { dispatcher.await(); Assert.assertTrue(NodeAction.NORMAL.equals(response.getNodeAction())); Assert.assertEquals(5120 + 15360, metrics.getAvailableMB()); + + // reconnect healthy node changing http port + nm1 = new MockNM("host1:1234", 5120, rm.getResourceTrackerService()); + nm1.setHttpPort(3); + nm1.registerNode(); + dispatcher.await(); + response = nm1.nodeHeartbeat(true); + response = nm1.nodeHeartbeat(true); + dispatcher.await(); + RMNode rmNode = rm.getRMContext().getRMNodes().get(nm1.getNodeId()); + Assert.assertEquals(3, rmNode.getHttpPort()); + Assert.assertEquals(5120, rmNode.getTotalCapability().getMemory()); + Assert.assertEquals(5120 + 15360, metrics.getAvailableMB()); + } private void writeToHostsFile(String... hosts) throws IOException {