YARN-3222. Fixed NPE on RMNodeImpl#ReconnectNodeTransition when a node is reconnected with a different port. Contributed by Rohith Sharmaks

(cherry picked from commit b2f1ec312e)
This commit is contained in:
Jian He 2015-03-03 16:25:57 -08:00
parent 824c32de1a
commit 888a445638
3 changed files with 39 additions and 18 deletions

View File

@ -571,12 +571,12 @@ public class RMNodeImpl implements RMNode, EventHandler<RMNodeEvent> {
rmNode.nodeUpdateQueue.clear();
rmNode.context.getDispatcher().getEventHandler().handle(
new NodeRemovedSchedulerEvent(rmNode));
if (rmNode.getHttpPort() == newNode.getHttpPort()) {
// Reset heartbeat ID since node just restarted.
rmNode.getLastNodeHeartBeatResponse().setResponseId(0);
if (rmNode.getState() != NodeState.UNHEALTHY) {
// Only add new node if old state is not UNHEALTHY
if (rmNode.getState().equals(NodeState.RUNNING)) {
// Only add new node if old state is RUNNING
rmNode.context.getDispatcher().getEventHandler().handle(
new NodeAddedSchedulerEvent(newNode));
}
@ -599,30 +599,32 @@ public class RMNodeImpl implements RMNode, EventHandler<RMNodeEvent> {
} else {
rmNode.httpPort = newNode.getHttpPort();
rmNode.httpAddress = newNode.getHttpAddress();
rmNode.totalCapability = newNode.getTotalCapability();
boolean isCapabilityChanged = false;
if (rmNode.getTotalCapability() != newNode.getTotalCapability()) {
rmNode.totalCapability = newNode.getTotalCapability();
isCapabilityChanged = true;
}
handleNMContainerStatus(reconnectEvent.getNMContainerStatuses(), rmNode);
// Reset heartbeat ID since node just restarted.
rmNode.getLastNodeHeartBeatResponse().setResponseId(0);
}
if (null != reconnectEvent.getRunningApplications()) {
for (ApplicationId appId : reconnectEvent.getRunningApplications()) {
handleRunningAppOnNode(rmNode, rmNode.context, appId, rmNode.nodeId);
}
}
rmNode.context.getDispatcher().getEventHandler().handle(
new NodesListManagerEvent(
NodesListManagerEventType.NODE_USABLE, rmNode));
if (rmNode.getState().equals(NodeState.RUNNING)) {
// Update scheduler node's capacity for reconnect node.
rmNode.context.getDispatcher().getEventHandler().handle(
new NodeResourceUpdateSchedulerEvent(rmNode,
ResourceOption.newInstance(newNode.getTotalCapability(), -1)));
if (isCapabilityChanged
&& rmNode.getState().equals(NodeState.RUNNING)) {
// Update scheduler node's capacity for reconnect node.
rmNode.context
.getDispatcher()
.getEventHandler()
.handle(
new NodeResourceUpdateSchedulerEvent(rmNode, ResourceOption
.newInstance(newNode.getTotalCapability(), -1)));
}
}
}
private void handleNMContainerStatus(

View File

@ -51,7 +51,7 @@ public class MockNM {
private final int memory;
private final int vCores;
private ResourceTrackerService resourceTracker;
private final int httpPort = 2;
private int httpPort = 2;
private MasterKey currentContainerTokenMasterKey;
private MasterKey currentNMTokenMasterKey;
private String version;
@ -87,6 +87,10 @@ public class MockNM {
return httpPort;
}
public void setHttpPort(int port) {
httpPort = port;
}
public void setResourceTrackerService(ResourceTrackerService resourceTracker) {
this.resourceTracker = resourceTracker;
}

View File

@ -56,6 +56,7 @@ import org.apache.hadoop.yarn.server.api.protocolrecords.RegisterNodeManagerResp
import org.apache.hadoop.yarn.server.api.records.NodeAction;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptImpl;
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.QueueMetrics;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.SchedulerEvent;
import org.apache.hadoop.yarn.server.utils.BuilderUtils;
@ -623,7 +624,7 @@ public class TestResourceTrackerService {
dispatcher.await();
Assert.assertTrue(NodeAction.NORMAL.equals(response.getNodeAction()));
Assert.assertEquals(5120 + 10240, metrics.getAvailableMB());
// reconnect of node with changed capability and running applications
List<ApplicationId> runningApps = new ArrayList<ApplicationId>();
runningApps.add(ApplicationId.newInstance(1, 0));
@ -633,6 +634,20 @@ public class TestResourceTrackerService {
dispatcher.await();
Assert.assertTrue(NodeAction.NORMAL.equals(response.getNodeAction()));
Assert.assertEquals(5120 + 15360, metrics.getAvailableMB());
// reconnect healthy node changing http port
nm1 = new MockNM("host1:1234", 5120, rm.getResourceTrackerService());
nm1.setHttpPort(3);
nm1.registerNode();
dispatcher.await();
response = nm1.nodeHeartbeat(true);
response = nm1.nodeHeartbeat(true);
dispatcher.await();
RMNode rmNode = rm.getRMContext().getRMNodes().get(nm1.getNodeId());
Assert.assertEquals(3, rmNode.getHttpPort());
Assert.assertEquals(5120, rmNode.getTotalCapability().getMemory());
Assert.assertEquals(5120 + 15360, metrics.getAvailableMB());
}
private void writeToHostsFile(String... hosts) throws IOException {