YARN-3896. RMNode transitioned from RUNNING to REBOOTED because its response id has not been reset synchronously. (Jun Gong via rohithsharmaks)

(cherry picked from commit feaf034994)
This commit is contained in:
Rohith Sharma K S 2015-08-24 11:25:07 +05:30
parent f7ee225052
commit 6dc732f2f7
8 changed files with 72 additions and 6 deletions

View File

@ -149,6 +149,9 @@ public class NodeInfo {
return null; return null;
} }
public void resetLastNodeHeartBeatResponse() {
}
public List<UpdatedContainerInfo> pullContainerUpdates() { public List<UpdatedContainerInfo> pullContainerUpdates() {
ArrayList<UpdatedContainerInfo> list = new ArrayList<UpdatedContainerInfo>(); ArrayList<UpdatedContainerInfo> list = new ArrayList<UpdatedContainerInfo>();

View File

@ -134,6 +134,11 @@ public class RMNodeWrapper implements RMNode {
return node.getLastNodeHeartBeatResponse(); return node.getLastNodeHeartBeatResponse();
} }
@Override
public void resetLastNodeHeartBeatResponse() {
node.getLastNodeHeartBeatResponse().setResponseId(0);
}
@Override @Override
@SuppressWarnings("unchecked") @SuppressWarnings("unchecked")
public List<UpdatedContainerInfo> pullContainerUpdates() { public List<UpdatedContainerInfo> pullContainerUpdates() {

View File

@ -740,6 +740,9 @@ Release 2.8.0 - UNRELEASED
YARN-3986. getTransferredContainers in AbstractYarnScheduler should be present YARN-3986. getTransferredContainers in AbstractYarnScheduler should be present
in YarnScheduler interface instead. (Varun Saxena via rohithsharmaks) in YarnScheduler interface instead. (Varun Saxena via rohithsharmaks)
YARN-3896. RMNode transitioned from RUNNING to REBOOTED because its response id
has not been reset synchronously. (Jun Gong via rohithsharmaks)
Release 2.7.2 - UNRELEASED Release 2.7.2 - UNRELEASED
INCOMPATIBLE CHANGES INCOMPATIBLE CHANGES

View File

@ -325,6 +325,8 @@ public class ResourceTrackerService extends AbstractService implements
} else { } else {
LOG.info("Reconnect from the node at: " + host); LOG.info("Reconnect from the node at: " + host);
this.nmLivelinessMonitor.unregister(nodeId); this.nmLivelinessMonitor.unregister(nodeId);
// Reset heartbeat ID since node just restarted.
oldNode.resetLastNodeHeartBeatResponse();
this.rmContext this.rmContext
.getDispatcher() .getDispatcher()
.getEventHandler() .getEventHandler()

View File

@ -129,7 +129,12 @@ public interface RMNode {
public void updateNodeHeartbeatResponseForCleanup(NodeHeartbeatResponse response); public void updateNodeHeartbeatResponseForCleanup(NodeHeartbeatResponse response);
public NodeHeartbeatResponse getLastNodeHeartBeatResponse(); public NodeHeartbeatResponse getLastNodeHeartBeatResponse();
/**
* Reset lastNodeHeartbeatResponse's ID to 0.
*/
void resetLastNodeHeartBeatResponse();
/** /**
* Get and clear the list of containerUpdates accumulated across NM * Get and clear the list of containerUpdates accumulated across NM
* heartbeats. * heartbeats.

View File

@ -443,6 +443,16 @@ public class RMNodeImpl implements RMNode, EventHandler<RMNodeEvent> {
} }
} }
@Override
public void resetLastNodeHeartBeatResponse() {
this.writeLock.lock();
try {
latestNodeHeartBeatResponse.setResponseId(0);
} finally {
this.writeLock.unlock();
}
}
public void handle(RMNodeEvent event) { public void handle(RMNodeEvent event) {
LOG.debug("Processing " + event.getNodeId() + " of type " + event.getType()); LOG.debug("Processing " + event.getNodeId() + " of type " + event.getType());
try { try {
@ -617,8 +627,6 @@ public class RMNodeImpl implements RMNode, EventHandler<RMNodeEvent> {
new NodeRemovedSchedulerEvent(rmNode)); new NodeRemovedSchedulerEvent(rmNode));
if (rmNode.getHttpPort() == newNode.getHttpPort()) { if (rmNode.getHttpPort() == newNode.getHttpPort()) {
// Reset heartbeat ID since node just restarted.
rmNode.getLastNodeHeartBeatResponse().setResponseId(0);
if (!rmNode.getTotalCapability().equals( if (!rmNode.getTotalCapability().equals(
newNode.getTotalCapability())) { newNode.getTotalCapability())) {
rmNode.totalCapability = newNode.getTotalCapability(); rmNode.totalCapability = newNode.getTotalCapability();
@ -656,9 +664,6 @@ public class RMNodeImpl implements RMNode, EventHandler<RMNodeEvent> {
handleNMContainerStatus(reconnectEvent.getNMContainerStatuses(), rmNode); handleNMContainerStatus(reconnectEvent.getNMContainerStatuses(), rmNode);
// Reset heartbeat ID since node just restarted.
rmNode.getLastNodeHeartBeatResponse().setResponseId(0);
for (ApplicationId appId : reconnectEvent.getRunningApplications()) { for (ApplicationId appId : reconnectEvent.getRunningApplications()) {
handleRunningAppOnNode(rmNode, rmNode.context, appId, rmNode.nodeId); handleRunningAppOnNode(rmNode, rmNode.context, appId, rmNode.nodeId);
} }

View File

@ -200,6 +200,10 @@ public class MockNodes {
return null; return null;
} }
@Override
public void resetLastNodeHeartBeatResponse() {
}
@Override @Override
public String getNodeManagerVersion() { public String getNodeManagerVersion() {
return null; return null;

View File

@ -21,6 +21,11 @@ package org.apache.hadoop.yarn.server.resourcemanager.resourcetracker;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.List; import java.util.List;
import org.apache.hadoop.yarn.api.records.NodeState;
import org.apache.hadoop.yarn.event.DrainDispatcher;
import org.apache.hadoop.yarn.server.resourcemanager.MockNM;
import org.apache.hadoop.yarn.server.resourcemanager.MockRM;
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
import org.junit.Assert; import org.junit.Assert;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.yarn.api.records.NodeId; import org.apache.hadoop.yarn.api.records.NodeId;
@ -189,4 +194,38 @@ public class TestNMReconnect {
nlm.stop(); nlm.stop();
scheduler.stop(); scheduler.stop();
} }
@Test(timeout = 10000)
public void testRMNodeStatusAfterReconnect() throws Exception {
// The node(127.0.0.1:1234) reconnected with RM. When it registered with
// RM, RM set its lastNodeHeartbeatResponse's id to 0 asynchronously. But
// the node's heartbeat come before RM succeeded setting the id to 0.
final DrainDispatcher dispatcher = new DrainDispatcher();
MockRM rm = new MockRM(){
@Override
protected Dispatcher createDispatcher() {
return dispatcher;
}
};
rm.start();
MockNM nm1 =
new MockNM("127.0.0.1:1234", 15120, rm.getResourceTrackerService());
nm1.registerNode();
int i = 0;
while(i < 3) {
nm1.nodeHeartbeat(true);
dispatcher.await();
i++;
}
MockNM nm2 =
new MockNM("127.0.0.1:1234", 15120, rm.getResourceTrackerService());
nm2.registerNode();
RMNode rmNode = rm.getRMContext().getRMNodes().get(nm2.getNodeId());
nm2.nodeHeartbeat(true);
dispatcher.await();
Assert.assertEquals("Node is Not in Running state.", NodeState.RUNNING,
rmNode.getState());
rm.stop();
}
} }