YARN-2561. MR job client cannot reconnect to AM after NM restart. Contributed by Junping Du
(cherry picked from commit a337f0e354
)
This commit is contained in:
parent
4b4e44a8aa
commit
d9273a9547
|
@ -361,6 +361,9 @@ Release 2.6.0 - UNRELEASED
|
|||
|
||||
YARN-2363. Submitted applications occasionally lack a tracking URL (jlowe)
|
||||
|
||||
YARN-2561. MR job client cannot reconnect to AM after NM restart. (Junping
|
||||
Du via jlowe)
|
||||
|
||||
Release 2.5.1 - 2014-09-05
|
||||
|
||||
INCOMPATIBLE CHANGES
|
||||
|
|
|
@ -544,12 +544,47 @@ public class RMNodeImpl implements RMNode, EventHandler<RMNodeEvent> {
|
|||
RMNodeReconnectEvent reconnectEvent = (RMNodeReconnectEvent) event;
|
||||
RMNode newNode = reconnectEvent.getReconnectedNode();
|
||||
rmNode.nodeManagerVersion = newNode.getNodeManagerVersion();
|
||||
rmNode.httpPort = newNode.getHttpPort();
|
||||
rmNode.httpAddress = newNode.getHttpAddress();
|
||||
rmNode.totalCapability = newNode.getTotalCapability();
|
||||
List<ApplicationId> runningApps = reconnectEvent.getRunningApplications();
|
||||
boolean noRunningApps =
|
||||
(runningApps == null) || (runningApps.size() == 0);
|
||||
|
||||
// Reset heartbeat ID since node just restarted.
|
||||
rmNode.getLastNodeHeartBeatResponse().setResponseId(0);
|
||||
// No application running on the node, so send node-removal event with
|
||||
// cleaning up old container info.
|
||||
if (noRunningApps) {
|
||||
rmNode.nodeUpdateQueue.clear();
|
||||
rmNode.context.getDispatcher().getEventHandler().handle(
|
||||
new NodeRemovedSchedulerEvent(rmNode));
|
||||
|
||||
if (rmNode.getHttpPort() == newNode.getHttpPort()) {
|
||||
// Reset heartbeat ID since node just restarted.
|
||||
rmNode.getLastNodeHeartBeatResponse().setResponseId(0);
|
||||
if (rmNode.getState() != NodeState.UNHEALTHY) {
|
||||
// Only add new node if old state is not UNHEALTHY
|
||||
rmNode.context.getDispatcher().getEventHandler().handle(
|
||||
new NodeAddedSchedulerEvent(newNode));
|
||||
}
|
||||
} else {
|
||||
// Reconnected node differs, so replace old node and start new node
|
||||
switch (rmNode.getState()) {
|
||||
case RUNNING:
|
||||
ClusterMetrics.getMetrics().decrNumActiveNodes();
|
||||
break;
|
||||
case UNHEALTHY:
|
||||
ClusterMetrics.getMetrics().decrNumUnhealthyNMs();
|
||||
break;
|
||||
}
|
||||
rmNode.context.getRMNodes().put(newNode.getNodeID(), newNode);
|
||||
rmNode.context.getDispatcher().getEventHandler().handle(
|
||||
new RMNodeStartedEvent(newNode.getNodeID(), null, null));
|
||||
}
|
||||
} else {
|
||||
rmNode.httpPort = newNode.getHttpPort();
|
||||
rmNode.httpAddress = newNode.getHttpAddress();
|
||||
rmNode.totalCapability = newNode.getTotalCapability();
|
||||
|
||||
// Reset heartbeat ID since node just restarted.
|
||||
rmNode.getLastNodeHeartBeatResponse().setResponseId(0);
|
||||
}
|
||||
|
||||
if (null != reconnectEvent.getRunningApplications()) {
|
||||
for (ApplicationId appId : reconnectEvent.getRunningApplications()) {
|
||||
|
@ -564,7 +599,7 @@ public class RMNodeImpl implements RMNode, EventHandler<RMNodeEvent> {
|
|||
// Update scheduler node's capacity for reconnect node.
|
||||
rmNode.context.getDispatcher().getEventHandler().handle(
|
||||
new NodeResourceUpdateSchedulerEvent(rmNode,
|
||||
ResourceOption.newInstance(rmNode.totalCapability, -1)));
|
||||
ResourceOption.newInstance(newNode.getTotalCapability(), -1)));
|
||||
}
|
||||
|
||||
}
|
||||
|
|
|
@ -26,6 +26,7 @@ import static org.mockito.Mockito.verify;
|
|||
import java.io.File;
|
||||
import java.io.FileOutputStream;
|
||||
import java.io.IOException;
|
||||
import java.util.ArrayList;
|
||||
import java.util.HashMap;
|
||||
import java.util.List;
|
||||
|
||||
|
@ -599,6 +600,16 @@ public class TestResourceTrackerService {
|
|||
dispatcher.await();
|
||||
Assert.assertTrue(NodeAction.NORMAL.equals(response.getNodeAction()));
|
||||
Assert.assertEquals(5120 + 10240, metrics.getAvailableMB());
|
||||
|
||||
// reconnect of node with changed capability and running applications
|
||||
List<ApplicationId> runningApps = new ArrayList<ApplicationId>();
|
||||
runningApps.add(ApplicationId.newInstance(1, 0));
|
||||
nm1 = rm.registerNode("host2:5678", 15360, 2, runningApps);
|
||||
dispatcher.await();
|
||||
response = nm1.nodeHeartbeat(true);
|
||||
dispatcher.await();
|
||||
Assert.assertTrue(NodeAction.NORMAL.equals(response.getNodeAction()));
|
||||
Assert.assertEquals(5120 + 15360, metrics.getAvailableMB());
|
||||
}
|
||||
|
||||
private void writeToHostsFile(String... hosts) throws IOException {
|
||||
|
|
Loading…
Reference in New Issue