YARN-2561. MR job client cannot reconnect to AM after NM restart. Contributed by Junping Du

This commit is contained in:
Jason Lowe 2014-09-18 21:34:40 +00:00
parent 1cf3198047
commit a337f0e354
3 changed files with 55 additions and 6 deletions

View File

@ -391,6 +391,9 @@ Release 2.6.0 - UNRELEASED
YARN-2363. Submitted applications occasionally lack a tracking URL (jlowe) YARN-2363. Submitted applications occasionally lack a tracking URL (jlowe)
YARN-2561. MR job client cannot reconnect to AM after NM restart. (Junping
Du via jlowe)
Release 2.5.1 - 2014-09-05 Release 2.5.1 - 2014-09-05
INCOMPATIBLE CHANGES INCOMPATIBLE CHANGES

View File

@ -544,12 +544,47 @@ public class RMNodeImpl implements RMNode, EventHandler<RMNodeEvent> {
RMNodeReconnectEvent reconnectEvent = (RMNodeReconnectEvent) event; RMNodeReconnectEvent reconnectEvent = (RMNodeReconnectEvent) event;
RMNode newNode = reconnectEvent.getReconnectedNode(); RMNode newNode = reconnectEvent.getReconnectedNode();
rmNode.nodeManagerVersion = newNode.getNodeManagerVersion(); rmNode.nodeManagerVersion = newNode.getNodeManagerVersion();
rmNode.httpPort = newNode.getHttpPort(); List<ApplicationId> runningApps = reconnectEvent.getRunningApplications();
rmNode.httpAddress = newNode.getHttpAddress(); boolean noRunningApps =
rmNode.totalCapability = newNode.getTotalCapability(); (runningApps == null) || (runningApps.size() == 0);
// Reset heartbeat ID since node just restarted. // No application running on the node, so send node-removal event with
rmNode.getLastNodeHeartBeatResponse().setResponseId(0); // cleaning up old container info.
if (noRunningApps) {
rmNode.nodeUpdateQueue.clear();
rmNode.context.getDispatcher().getEventHandler().handle(
new NodeRemovedSchedulerEvent(rmNode));
if (rmNode.getHttpPort() == newNode.getHttpPort()) {
// Reset heartbeat ID since node just restarted.
rmNode.getLastNodeHeartBeatResponse().setResponseId(0);
if (rmNode.getState() != NodeState.UNHEALTHY) {
// Only add new node if old state is not UNHEALTHY
rmNode.context.getDispatcher().getEventHandler().handle(
new NodeAddedSchedulerEvent(newNode));
}
} else {
// Reconnected node differs, so replace old node and start new node
switch (rmNode.getState()) {
case RUNNING:
ClusterMetrics.getMetrics().decrNumActiveNodes();
break;
case UNHEALTHY:
ClusterMetrics.getMetrics().decrNumUnhealthyNMs();
break;
}
rmNode.context.getRMNodes().put(newNode.getNodeID(), newNode);
rmNode.context.getDispatcher().getEventHandler().handle(
new RMNodeStartedEvent(newNode.getNodeID(), null, null));
}
} else {
rmNode.httpPort = newNode.getHttpPort();
rmNode.httpAddress = newNode.getHttpAddress();
rmNode.totalCapability = newNode.getTotalCapability();
// Reset heartbeat ID since node just restarted.
rmNode.getLastNodeHeartBeatResponse().setResponseId(0);
}
if (null != reconnectEvent.getRunningApplications()) { if (null != reconnectEvent.getRunningApplications()) {
for (ApplicationId appId : reconnectEvent.getRunningApplications()) { for (ApplicationId appId : reconnectEvent.getRunningApplications()) {
@ -564,7 +599,7 @@ public class RMNodeImpl implements RMNode, EventHandler<RMNodeEvent> {
// Update scheduler node's capacity for reconnect node. // Update scheduler node's capacity for reconnect node.
rmNode.context.getDispatcher().getEventHandler().handle( rmNode.context.getDispatcher().getEventHandler().handle(
new NodeResourceUpdateSchedulerEvent(rmNode, new NodeResourceUpdateSchedulerEvent(rmNode,
ResourceOption.newInstance(rmNode.totalCapability, -1))); ResourceOption.newInstance(newNode.getTotalCapability(), -1)));
} }
} }

View File

@ -26,6 +26,7 @@ import static org.mockito.Mockito.verify;
import java.io.File; import java.io.File;
import java.io.FileOutputStream; import java.io.FileOutputStream;
import java.io.IOException; import java.io.IOException;
import java.util.ArrayList;
import java.util.HashMap; import java.util.HashMap;
import java.util.List; import java.util.List;
@ -599,6 +600,16 @@ public class TestResourceTrackerService {
dispatcher.await(); dispatcher.await();
Assert.assertTrue(NodeAction.NORMAL.equals(response.getNodeAction())); Assert.assertTrue(NodeAction.NORMAL.equals(response.getNodeAction()));
Assert.assertEquals(5120 + 10240, metrics.getAvailableMB()); Assert.assertEquals(5120 + 10240, metrics.getAvailableMB());
// reconnect of node with changed capability and running applications
List<ApplicationId> runningApps = new ArrayList<ApplicationId>();
runningApps.add(ApplicationId.newInstance(1, 0));
nm1 = rm.registerNode("host2:5678", 15360, 2, runningApps);
dispatcher.await();
response = nm1.nodeHeartbeat(true);
dispatcher.await();
Assert.assertTrue(NodeAction.NORMAL.equals(response.getNodeAction()));
Assert.assertEquals(5120 + 15360, metrics.getAvailableMB());
} }
private void writeToHostsFile(String... hosts) throws IOException { private void writeToHostsFile(String... hosts) throws IOException {