YARN-590. Added an optional mesage to be returned by ResourceMaanger when RM asks an RM to shutdown/resync etc so that NMs can log this message locally for better debuggability. Contributed by Mayank Bansal.

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1481234 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Vinod Kumar Vavilapalli 2013-05-10 23:37:44 +00:00
parent a35c7fd80b
commit 68148989bf
11 changed files with 110 additions and 20 deletions

View File

@ -246,6 +246,10 @@ Release 2.0.5-beta - UNRELEASED
YARN-663. Changed ResourceTracker API and LocalizationProtocol API to throw
YarnRemoteException and IOException. (Xuan Gong via vinodkv)
YARN-590. Added an optional mesage to be returned by ResourceMaanger when RM
asks an RM to shutdown/resync etc so that NMs can log this message locally
for better debuggability. (Mayank Bansal via vinodkv)
OPTIMIZATIONS
BUG FIXES

View File

@ -45,4 +45,8 @@ public interface NodeHeartbeatResponse {
long getNextHeartBeatInterval();
void setNextHeartBeatInterval(long nextHeartBeatInterval);
String getDiagnosticsMessage();
void setDiagnosticsMessage(String diagnosticsMessage);
}

View File

@ -33,4 +33,9 @@ public interface RegisterNodeManagerResponse {
long getRMIdentifier();
void setRMIdentifier(long rmIdentifier);
String getDiagnosticsMessage();
void setDiagnosticsMessage(String diagnosticsMessage);
}

View File

@ -145,6 +145,25 @@ public class NodeHeartbeatResponsePBImpl extends ProtoBase<NodeHeartbeatResponse
builder.setNodeAction(convertToProtoFormat(nodeAction));
}
@Override
public String getDiagnosticsMessage() {
NodeHeartbeatResponseProtoOrBuilder p = viaProto ? proto : builder;
if (!p.hasDiagnosticsMessage()) {
return null;
}
return p.getDiagnosticsMessage();
}
@Override
public void setDiagnosticsMessage(String diagnosticsMessage) {
maybeInitBuilder();
if (diagnosticsMessage == null) {
builder.clearDiagnosticsMessage();
return;
}
builder.setDiagnosticsMessage((diagnosticsMessage));
}
@Override
public List<ContainerId> getContainersToCleanup() {
initContainersToCleanup();

View File

@ -101,6 +101,25 @@ public class RegisterNodeManagerResponsePBImpl extends ProtoBase<RegisterNodeMan
rebuild = true;
}
@Override
public String getDiagnosticsMessage() {
RegisterNodeManagerResponseProtoOrBuilder p = viaProto ? proto : builder;
if (!p.hasDiagnosticsMessage()) {
return null;
}
return p.getDiagnosticsMessage();
}
@Override
public void setDiagnosticsMessage(String diagnosticsMessage) {
maybeInitBuilder();
if (diagnosticsMessage == null) {
builder.clearDiagnosticsMessage();
return;
}
builder.setDiagnosticsMessage((diagnosticsMessage));
}
@Override
public NodeAction getNodeAction() {
RegisterNodeManagerResponseProtoOrBuilder p = viaProto ? proto : builder;

View File

@ -34,6 +34,7 @@ message RegisterNodeManagerResponseProto {
optional MasterKeyProto master_key = 1;
optional NodeActionProto nodeAction = 2;
optional int64 rm_identifier = 3;
optional string diagnostics_message = 4;
}
message NodeHeartbeatRequestProto {
@ -49,4 +50,5 @@ message NodeHeartbeatResponseProto {
repeated ContainerIdProto containers_to_cleanup = 4;
repeated ApplicationIdProto applications_to_cleanup = 5;
optional int64 nextHeartBeatInterval = 6;
optional string diagnostics_message = 7;
}

View File

@ -295,8 +295,12 @@ public class NodeStatusUpdaterImpl extends AbstractService implements
}
// if the Resourcemanager instructs NM to shutdown.
if (NodeAction.SHUTDOWN.equals(regNMResponse.getNodeAction())) {
String message =
"Message from ResourceManager: "
+ regNMResponse.getDiagnosticsMessage();
throw new YarnException(
"Recieved SHUTDOWN signal from Resourcemanager ,Registration of NodeManager failed");
"Recieved SHUTDOWN signal from Resourcemanager ,Registration of NodeManager failed, "
+ message);
}
if (UserGroupInformation.isSecurityEnabled()) {
@ -482,15 +486,19 @@ public class NodeStatusUpdaterImpl extends AbstractService implements
if (response.getNodeAction() == NodeAction.SHUTDOWN) {
LOG
.info("Recieved SHUTDOWN signal from Resourcemanager as part of heartbeat," +
" hence shutting down.");
.warn("Recieved SHUTDOWN signal from Resourcemanager as part of heartbeat,"
+ " hence shutting down.");
LOG.warn("Message from ResourceManager: "
+ response.getDiagnosticsMessage());
dispatcher.getEventHandler().handle(
new NodeManagerEvent(NodeManagerEventType.SHUTDOWN));
break;
}
if (response.getNodeAction() == NodeAction.RESYNC) {
LOG.info("Node is out of sync with ResourceManager,"
LOG.warn("Node is out of sync with ResourceManager,"
+ " hence rebooting.");
LOG.warn("Message from ResourceManager: "
+ response.getDiagnosticsMessage());
// Invalidate the RMIdentifier while resync
NodeStatusUpdaterImpl.this.rmIdentifier =
ResourceManagerConstants.RM_INVALID_IDENTIFIER;

View File

@ -388,6 +388,7 @@ public class TestNodeStatusUpdater {
private class MyResourceTracker2 implements ResourceTracker {
public NodeAction heartBeatNodeAction = NodeAction.NORMAL;
public NodeAction registerNodeAction = NodeAction.NORMAL;
public String shutDownMessage = "";
@Override
public RegisterNodeManagerResponse registerNodeManager(
@ -397,6 +398,7 @@ public class TestNodeStatusUpdater {
RegisterNodeManagerResponse response = recordFactory
.newRecordInstance(RegisterNodeManagerResponse.class);
response.setNodeAction(registerNodeAction );
response.setDiagnosticsMessage(shutDownMessage);
return response;
}
@Override
@ -408,6 +410,7 @@ public class TestNodeStatusUpdater {
NodeHeartbeatResponse nhResponse = YarnServerBuilderUtils.
newNodeHeartbeatResponse(heartBeatID, heartBeatNodeAction, null,
null, null, 1000L);
nhResponse.setDiagnosticsMessage(shutDownMessage);
return nhResponse;
}
}
@ -737,12 +740,15 @@ public class TestNodeStatusUpdater {
context, dispatcher, healthChecker, metrics);
MyResourceTracker2 myResourceTracker2 = new MyResourceTracker2();
myResourceTracker2.registerNodeAction = NodeAction.SHUTDOWN;
myResourceTracker2.shutDownMessage = "RM Shutting Down Node";
nodeStatusUpdater.resourceTracker = myResourceTracker2;
return nodeStatusUpdater;
}
};
verifyNodeStartFailure("org.apache.hadoop.yarn.YarnException: "
+ "Recieved SHUTDOWN signal from Resourcemanager ,Registration of NodeManager failed");
+ "Recieved SHUTDOWN signal from Resourcemanager ,"
+ "Registration of NodeManager failed, "
+ "Message from ResourceManager: RM Shutting Down Node");
}
@Test (timeout = 15000)

View File

@ -176,8 +176,11 @@ public class ResourceTrackerService extends AbstractService implements
// Check if this node is a 'valid' node
if (!this.nodesListManager.isValidNode(host)) {
LOG.info("Disallowed NodeManager from " + host
+ ", Sending SHUTDOWN signal to the NodeManager.");
String message =
"Disallowed NodeManager from " + host
+ ", Sending SHUTDOWN signal to the NodeManager.";
LOG.info(message);
response.setDiagnosticsMessage(message);
response.setNodeAction(NodeAction.SHUTDOWN);
return response;
}
@ -185,9 +188,12 @@ public class ResourceTrackerService extends AbstractService implements
// Check if this node has minimum allocations
if (capability.getMemory() < minAllocMb
|| capability.getVirtualCores() < minAllocVcores) {
LOG.info("NodeManager from " + host
+ " doesn't satisfy minimum allocations, Sending SHUTDOWN"
+ " signal to the NodeManager.");
String message =
"NodeManager from " + host
+ " doesn't satisfy minimum allocations, Sending SHUTDOWN"
+ " signal to the NodeManager.";
LOG.info(message);
response.setDiagnosticsMessage(message);
response.setNodeAction(NodeAction.SHUTDOWN);
return response;
}
@ -214,10 +220,11 @@ public class ResourceTrackerService extends AbstractService implements
this.nmLivelinessMonitor.register(nodeId);
LOG.info("NodeManager from node " + host + "(cmPort: " + cmPort
+ " httpPort: " + httpPort + ") " + "registered with capability: "
+ capability + ", assigned nodeId " + nodeId);
String message =
"NodeManager from node " + host + "(cmPort: " + cmPort + " httpPort: "
+ httpPort + ") " + "registered with capability: " + capability
+ ", assigned nodeId " + nodeId;
LOG.info(message);
response.setNodeAction(NodeAction.NORMAL);
response.setRMIdentifier(ResourceManager.clusterTimeStamp);
return response;
@ -243,7 +250,9 @@ public class ResourceTrackerService extends AbstractService implements
RMNode rmNode = this.rmContext.getRMNodes().get(nodeId);
if (rmNode == null) {
/* node does not exist */
LOG.info("Node not found rebooting " + remoteNodeStatus.getNodeId());
String message = "Node not found rebooting " + remoteNodeStatus.getNodeId();
LOG.info(message);
resync.setDiagnosticsMessage(message);
return resync;
}
@ -252,8 +261,11 @@ public class ResourceTrackerService extends AbstractService implements
// 2. Check if it's a valid (i.e. not excluded) node
if (!this.nodesListManager.isValidNode(rmNode.getHostName())) {
LOG.info("Disallowed NodeManager nodeId: " + nodeId + " hostname: "
+ rmNode.getNodeAddress());
String message =
"Disallowed NodeManager nodeId: " + nodeId + " hostname: "
+ rmNode.getNodeAddress();
LOG.info(message);
shutDown.setDiagnosticsMessage(message);
this.rmContext.getDispatcher().getEventHandler().handle(
new RMNodeEvent(nodeId, RMNodeEventType.DECOMMISSION));
return shutDown;
@ -268,9 +280,12 @@ public class ResourceTrackerService extends AbstractService implements
return lastNodeHeartbeatResponse;
} else if (remoteNodeStatus.getResponseId() + 1 < lastNodeHeartbeatResponse
.getResponseId()) {
LOG.info("Too far behind rm response id:"
+ lastNodeHeartbeatResponse.getResponseId() + " nm response id:"
+ remoteNodeStatus.getResponseId());
String message =
"Too far behind rm response id:"
+ lastNodeHeartbeatResponse.getResponseId() + " nm response id:"
+ remoteNodeStatus.getResponseId();
LOG.info(message);
resync.setDiagnosticsMessage(message);
// TODO: Just sending reboot is not enough. Think more.
this.rmContext.getDispatcher().getEventHandler().handle(
new RMNodeEvent(nodeId, RMNodeEventType.REBOOTING));

View File

@ -268,6 +268,10 @@ public class TestResourceTrackerService {
// trying to register a invalid node.
RegisterNodeManagerResponse response = resourceTrackerService.registerNodeManager(req);
Assert.assertEquals(NodeAction.SHUTDOWN,response.getNodeAction());
Assert
.assertEquals(
"Disallowed NodeManager from host2, Sending SHUTDOWN signal to the NodeManager.",
response.getDiagnosticsMessage());
}
@Test
@ -344,6 +348,8 @@ public class TestResourceTrackerService {
nodeHeartbeat = nm2.nodeHeartbeat(
new HashMap<ApplicationId, List<ContainerStatus>>(), true, -100);
Assert.assertTrue(NodeAction.RESYNC.equals(nodeHeartbeat.getNodeAction()));
Assert.assertEquals("Too far behind rm response id:0 nm response id:-100",
nodeHeartbeat.getDiagnosticsMessage());
checkRebootedNMCount(rm, ++initialMetricCount);
}

View File

@ -133,5 +133,7 @@ public class TestRMNMRPCResponseId {
nodeStatus.setResponseId(0);
response = resourceTrackerService.nodeHeartbeat(nodeHeartBeatRequest);
Assert.assertTrue(NodeAction.RESYNC.equals(response.getNodeAction()));
Assert.assertEquals("Too far behind rm response id:2 nm response id:0",
response.getDiagnosticsMessage());
}
}