YARN-3222. Fixed NPE on RMNodeImpl#ReconnectNodeTransition when a node is reconnected with a different port. Contributed by Rohith Sharmaks
(cherry picked from commitb2f1ec312e
) (cherry picked from commit888a445638
) (cherry picked from commit b78f87825bd593e30b2f2ea76f37c7a4fd673ab2)
This commit is contained in:
parent
6e090bc53d
commit
db92b09e03
|
@ -568,8 +568,8 @@ public class RMNodeImpl implements RMNode, EventHandler<RMNodeEvent> {
|
|||
if (rmNode.getHttpPort() == newNode.getHttpPort()) {
|
||||
// Reset heartbeat ID since node just restarted.
|
||||
rmNode.getLastNodeHeartBeatResponse().setResponseId(0);
|
||||
if (rmNode.getState() != NodeState.UNHEALTHY) {
|
||||
// Only add new node if old state is not UNHEALTHY
|
||||
if (rmNode.getState().equals(NodeState.RUNNING)) {
|
||||
// Only add new node if old state is RUNNING
|
||||
rmNode.context.getDispatcher().getEventHandler().handle(
|
||||
new NodeAddedSchedulerEvent(newNode));
|
||||
}
|
||||
|
@ -590,28 +590,30 @@ public class RMNodeImpl implements RMNode, EventHandler<RMNodeEvent> {
|
|||
} else {
|
||||
rmNode.httpPort = newNode.getHttpPort();
|
||||
rmNode.httpAddress = newNode.getHttpAddress();
|
||||
rmNode.totalCapability = newNode.getTotalCapability();
|
||||
boolean isCapabilityChanged = false;
|
||||
if (rmNode.getTotalCapability() != newNode.getTotalCapability()) {
|
||||
rmNode.totalCapability = newNode.getTotalCapability();
|
||||
isCapabilityChanged = true;
|
||||
}
|
||||
|
||||
// Reset heartbeat ID since node just restarted.
|
||||
rmNode.getLastNodeHeartBeatResponse().setResponseId(0);
|
||||
}
|
||||
|
||||
if (null != reconnectEvent.getRunningApplications()) {
|
||||
for (ApplicationId appId : reconnectEvent.getRunningApplications()) {
|
||||
handleRunningAppOnNode(rmNode, rmNode.context, appId, rmNode.nodeId);
|
||||
}
|
||||
}
|
||||
|
||||
rmNode.context.getDispatcher().getEventHandler().handle(
|
||||
new NodesListManagerEvent(
|
||||
NodesListManagerEventType.NODE_USABLE, rmNode));
|
||||
if (rmNode.getState().equals(NodeState.RUNNING)) {
|
||||
// Update scheduler node's capacity for reconnect node.
|
||||
rmNode.context.getDispatcher().getEventHandler().handle(
|
||||
new NodeResourceUpdateSchedulerEvent(rmNode,
|
||||
ResourceOption.newInstance(newNode.getTotalCapability(), -1)));
|
||||
if (isCapabilityChanged
|
||||
&& rmNode.getState().equals(NodeState.RUNNING)) {
|
||||
// Update scheduler node's capacity for reconnect node.
|
||||
rmNode.context
|
||||
.getDispatcher()
|
||||
.getEventHandler()
|
||||
.handle(
|
||||
new NodeResourceUpdateSchedulerEvent(rmNode, ResourceOption
|
||||
.newInstance(newNode.getTotalCapability(), -1)));
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -51,7 +51,7 @@ public class MockNM {
|
|||
private final int memory;
|
||||
private final int vCores;
|
||||
private ResourceTrackerService resourceTracker;
|
||||
private final int httpPort = 2;
|
||||
private int httpPort = 2;
|
||||
private MasterKey currentContainerTokenMasterKey;
|
||||
private MasterKey currentNMTokenMasterKey;
|
||||
private String version;
|
||||
|
@ -87,6 +87,10 @@ public class MockNM {
|
|||
return httpPort;
|
||||
}
|
||||
|
||||
public void setHttpPort(int port) {
|
||||
httpPort = port;
|
||||
}
|
||||
|
||||
public void setResourceTrackerService(ResourceTrackerService resourceTracker) {
|
||||
this.resourceTracker = resourceTracker;
|
||||
}
|
||||
|
|
|
@ -56,6 +56,7 @@ import org.apache.hadoop.yarn.server.api.protocolrecords.RegisterNodeManagerResp
|
|||
import org.apache.hadoop.yarn.server.api.records.NodeAction;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptImpl;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.QueueMetrics;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.SchedulerEvent;
|
||||
import org.apache.hadoop.yarn.server.utils.BuilderUtils;
|
||||
|
@ -631,6 +632,20 @@ public class TestResourceTrackerService {
|
|||
dispatcher.await();
|
||||
Assert.assertTrue(NodeAction.NORMAL.equals(response.getNodeAction()));
|
||||
Assert.assertEquals(5120 + 15360, metrics.getAvailableMB());
|
||||
|
||||
// reconnect healthy node changing http port
|
||||
nm1 = new MockNM("host1:1234", 5120, rm.getResourceTrackerService());
|
||||
nm1.setHttpPort(3);
|
||||
nm1.registerNode();
|
||||
dispatcher.await();
|
||||
response = nm1.nodeHeartbeat(true);
|
||||
response = nm1.nodeHeartbeat(true);
|
||||
dispatcher.await();
|
||||
RMNode rmNode = rm.getRMContext().getRMNodes().get(nm1.getNodeId());
|
||||
Assert.assertEquals(3, rmNode.getHttpPort());
|
||||
Assert.assertEquals(5120, rmNode.getTotalCapability().getMemory());
|
||||
Assert.assertEquals(5120 + 15360, metrics.getAvailableMB());
|
||||
|
||||
}
|
||||
|
||||
private void writeToHostsFile(String... hosts) throws IOException {
|
||||
|
|
Loading…
Reference in New Issue