YARN-590. Added an optional mesage to be returned by ResourceMaanger when RM asks an RM to shutdown/resync etc so that NMs can log this message locally for better debuggability. Contributed by Mayank Bansal.
svn merge --ignore-ancestry -c 1481234 ../../trunk/ git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/branch-2@1481235 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
0abf49936a
commit
af10c356bc
|
@ -171,6 +171,10 @@ Release 2.0.5-beta - UNRELEASED
|
||||||
YARN-663. Changed ResourceTracker API and LocalizationProtocol API to throw
|
YARN-663. Changed ResourceTracker API and LocalizationProtocol API to throw
|
||||||
YarnRemoteException and IOException. (Xuan Gong via vinodkv)
|
YarnRemoteException and IOException. (Xuan Gong via vinodkv)
|
||||||
|
|
||||||
|
YARN-590. Added an optional mesage to be returned by ResourceMaanger when RM
|
||||||
|
asks an RM to shutdown/resync etc so that NMs can log this message locally
|
||||||
|
for better debuggability. (Mayank Bansal via vinodkv)
|
||||||
|
|
||||||
OPTIMIZATIONS
|
OPTIMIZATIONS
|
||||||
|
|
||||||
BUG FIXES
|
BUG FIXES
|
||||||
|
|
|
@ -45,4 +45,8 @@ public interface NodeHeartbeatResponse {
|
||||||
|
|
||||||
long getNextHeartBeatInterval();
|
long getNextHeartBeatInterval();
|
||||||
void setNextHeartBeatInterval(long nextHeartBeatInterval);
|
void setNextHeartBeatInterval(long nextHeartBeatInterval);
|
||||||
|
|
||||||
|
String getDiagnosticsMessage();
|
||||||
|
|
||||||
|
void setDiagnosticsMessage(String diagnosticsMessage);
|
||||||
}
|
}
|
||||||
|
|
|
@ -33,4 +33,9 @@ public interface RegisterNodeManagerResponse {
|
||||||
long getRMIdentifier();
|
long getRMIdentifier();
|
||||||
|
|
||||||
void setRMIdentifier(long rmIdentifier);
|
void setRMIdentifier(long rmIdentifier);
|
||||||
|
|
||||||
|
String getDiagnosticsMessage();
|
||||||
|
|
||||||
|
void setDiagnosticsMessage(String diagnosticsMessage);
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
|
@ -145,6 +145,25 @@ public class NodeHeartbeatResponsePBImpl extends ProtoBase<NodeHeartbeatResponse
|
||||||
builder.setNodeAction(convertToProtoFormat(nodeAction));
|
builder.setNodeAction(convertToProtoFormat(nodeAction));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public String getDiagnosticsMessage() {
|
||||||
|
NodeHeartbeatResponseProtoOrBuilder p = viaProto ? proto : builder;
|
||||||
|
if (!p.hasDiagnosticsMessage()) {
|
||||||
|
return null;
|
||||||
|
}
|
||||||
|
return p.getDiagnosticsMessage();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void setDiagnosticsMessage(String diagnosticsMessage) {
|
||||||
|
maybeInitBuilder();
|
||||||
|
if (diagnosticsMessage == null) {
|
||||||
|
builder.clearDiagnosticsMessage();
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
builder.setDiagnosticsMessage((diagnosticsMessage));
|
||||||
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public List<ContainerId> getContainersToCleanup() {
|
public List<ContainerId> getContainersToCleanup() {
|
||||||
initContainersToCleanup();
|
initContainersToCleanup();
|
||||||
|
|
|
@ -101,6 +101,25 @@ public class RegisterNodeManagerResponsePBImpl extends ProtoBase<RegisterNodeMan
|
||||||
rebuild = true;
|
rebuild = true;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public String getDiagnosticsMessage() {
|
||||||
|
RegisterNodeManagerResponseProtoOrBuilder p = viaProto ? proto : builder;
|
||||||
|
if (!p.hasDiagnosticsMessage()) {
|
||||||
|
return null;
|
||||||
|
}
|
||||||
|
return p.getDiagnosticsMessage();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void setDiagnosticsMessage(String diagnosticsMessage) {
|
||||||
|
maybeInitBuilder();
|
||||||
|
if (diagnosticsMessage == null) {
|
||||||
|
builder.clearDiagnosticsMessage();
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
builder.setDiagnosticsMessage((diagnosticsMessage));
|
||||||
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public NodeAction getNodeAction() {
|
public NodeAction getNodeAction() {
|
||||||
RegisterNodeManagerResponseProtoOrBuilder p = viaProto ? proto : builder;
|
RegisterNodeManagerResponseProtoOrBuilder p = viaProto ? proto : builder;
|
||||||
|
|
|
@ -34,6 +34,7 @@ message RegisterNodeManagerResponseProto {
|
||||||
optional MasterKeyProto master_key = 1;
|
optional MasterKeyProto master_key = 1;
|
||||||
optional NodeActionProto nodeAction = 2;
|
optional NodeActionProto nodeAction = 2;
|
||||||
optional int64 rm_identifier = 3;
|
optional int64 rm_identifier = 3;
|
||||||
|
optional string diagnostics_message = 4;
|
||||||
}
|
}
|
||||||
|
|
||||||
message NodeHeartbeatRequestProto {
|
message NodeHeartbeatRequestProto {
|
||||||
|
@ -49,4 +50,5 @@ message NodeHeartbeatResponseProto {
|
||||||
repeated ContainerIdProto containers_to_cleanup = 4;
|
repeated ContainerIdProto containers_to_cleanup = 4;
|
||||||
repeated ApplicationIdProto applications_to_cleanup = 5;
|
repeated ApplicationIdProto applications_to_cleanup = 5;
|
||||||
optional int64 nextHeartBeatInterval = 6;
|
optional int64 nextHeartBeatInterval = 6;
|
||||||
|
optional string diagnostics_message = 7;
|
||||||
}
|
}
|
||||||
|
|
|
@ -295,8 +295,12 @@ public class NodeStatusUpdaterImpl extends AbstractService implements
|
||||||
}
|
}
|
||||||
// if the Resourcemanager instructs NM to shutdown.
|
// if the Resourcemanager instructs NM to shutdown.
|
||||||
if (NodeAction.SHUTDOWN.equals(regNMResponse.getNodeAction())) {
|
if (NodeAction.SHUTDOWN.equals(regNMResponse.getNodeAction())) {
|
||||||
|
String message =
|
||||||
|
"Message from ResourceManager: "
|
||||||
|
+ regNMResponse.getDiagnosticsMessage();
|
||||||
throw new YarnException(
|
throw new YarnException(
|
||||||
"Recieved SHUTDOWN signal from Resourcemanager ,Registration of NodeManager failed");
|
"Recieved SHUTDOWN signal from Resourcemanager ,Registration of NodeManager failed, "
|
||||||
|
+ message);
|
||||||
}
|
}
|
||||||
|
|
||||||
if (UserGroupInformation.isSecurityEnabled()) {
|
if (UserGroupInformation.isSecurityEnabled()) {
|
||||||
|
@ -482,15 +486,19 @@ public class NodeStatusUpdaterImpl extends AbstractService implements
|
||||||
|
|
||||||
if (response.getNodeAction() == NodeAction.SHUTDOWN) {
|
if (response.getNodeAction() == NodeAction.SHUTDOWN) {
|
||||||
LOG
|
LOG
|
||||||
.info("Recieved SHUTDOWN signal from Resourcemanager as part of heartbeat," +
|
.warn("Recieved SHUTDOWN signal from Resourcemanager as part of heartbeat,"
|
||||||
" hence shutting down.");
|
+ " hence shutting down.");
|
||||||
|
LOG.warn("Message from ResourceManager: "
|
||||||
|
+ response.getDiagnosticsMessage());
|
||||||
dispatcher.getEventHandler().handle(
|
dispatcher.getEventHandler().handle(
|
||||||
new NodeManagerEvent(NodeManagerEventType.SHUTDOWN));
|
new NodeManagerEvent(NodeManagerEventType.SHUTDOWN));
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
if (response.getNodeAction() == NodeAction.RESYNC) {
|
if (response.getNodeAction() == NodeAction.RESYNC) {
|
||||||
LOG.info("Node is out of sync with ResourceManager,"
|
LOG.warn("Node is out of sync with ResourceManager,"
|
||||||
+ " hence rebooting.");
|
+ " hence rebooting.");
|
||||||
|
LOG.warn("Message from ResourceManager: "
|
||||||
|
+ response.getDiagnosticsMessage());
|
||||||
// Invalidate the RMIdentifier while resync
|
// Invalidate the RMIdentifier while resync
|
||||||
NodeStatusUpdaterImpl.this.rmIdentifier =
|
NodeStatusUpdaterImpl.this.rmIdentifier =
|
||||||
ResourceManagerConstants.RM_INVALID_IDENTIFIER;
|
ResourceManagerConstants.RM_INVALID_IDENTIFIER;
|
||||||
|
|
|
@ -388,6 +388,7 @@ public class TestNodeStatusUpdater {
|
||||||
private class MyResourceTracker2 implements ResourceTracker {
|
private class MyResourceTracker2 implements ResourceTracker {
|
||||||
public NodeAction heartBeatNodeAction = NodeAction.NORMAL;
|
public NodeAction heartBeatNodeAction = NodeAction.NORMAL;
|
||||||
public NodeAction registerNodeAction = NodeAction.NORMAL;
|
public NodeAction registerNodeAction = NodeAction.NORMAL;
|
||||||
|
public String shutDownMessage = "";
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public RegisterNodeManagerResponse registerNodeManager(
|
public RegisterNodeManagerResponse registerNodeManager(
|
||||||
|
@ -397,6 +398,7 @@ public class TestNodeStatusUpdater {
|
||||||
RegisterNodeManagerResponse response = recordFactory
|
RegisterNodeManagerResponse response = recordFactory
|
||||||
.newRecordInstance(RegisterNodeManagerResponse.class);
|
.newRecordInstance(RegisterNodeManagerResponse.class);
|
||||||
response.setNodeAction(registerNodeAction );
|
response.setNodeAction(registerNodeAction );
|
||||||
|
response.setDiagnosticsMessage(shutDownMessage);
|
||||||
return response;
|
return response;
|
||||||
}
|
}
|
||||||
@Override
|
@Override
|
||||||
|
@ -408,6 +410,7 @@ public class TestNodeStatusUpdater {
|
||||||
NodeHeartbeatResponse nhResponse = YarnServerBuilderUtils.
|
NodeHeartbeatResponse nhResponse = YarnServerBuilderUtils.
|
||||||
newNodeHeartbeatResponse(heartBeatID, heartBeatNodeAction, null,
|
newNodeHeartbeatResponse(heartBeatID, heartBeatNodeAction, null,
|
||||||
null, null, 1000L);
|
null, null, 1000L);
|
||||||
|
nhResponse.setDiagnosticsMessage(shutDownMessage);
|
||||||
return nhResponse;
|
return nhResponse;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -737,12 +740,15 @@ public class TestNodeStatusUpdater {
|
||||||
context, dispatcher, healthChecker, metrics);
|
context, dispatcher, healthChecker, metrics);
|
||||||
MyResourceTracker2 myResourceTracker2 = new MyResourceTracker2();
|
MyResourceTracker2 myResourceTracker2 = new MyResourceTracker2();
|
||||||
myResourceTracker2.registerNodeAction = NodeAction.SHUTDOWN;
|
myResourceTracker2.registerNodeAction = NodeAction.SHUTDOWN;
|
||||||
|
myResourceTracker2.shutDownMessage = "RM Shutting Down Node";
|
||||||
nodeStatusUpdater.resourceTracker = myResourceTracker2;
|
nodeStatusUpdater.resourceTracker = myResourceTracker2;
|
||||||
return nodeStatusUpdater;
|
return nodeStatusUpdater;
|
||||||
}
|
}
|
||||||
};
|
};
|
||||||
verifyNodeStartFailure("org.apache.hadoop.yarn.YarnException: "
|
verifyNodeStartFailure("org.apache.hadoop.yarn.YarnException: "
|
||||||
+ "Recieved SHUTDOWN signal from Resourcemanager ,Registration of NodeManager failed");
|
+ "Recieved SHUTDOWN signal from Resourcemanager ,"
|
||||||
|
+ "Registration of NodeManager failed, "
|
||||||
|
+ "Message from ResourceManager: RM Shutting Down Node");
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test (timeout = 15000)
|
@Test (timeout = 15000)
|
||||||
|
|
|
@ -176,8 +176,11 @@ public class ResourceTrackerService extends AbstractService implements
|
||||||
|
|
||||||
// Check if this node is a 'valid' node
|
// Check if this node is a 'valid' node
|
||||||
if (!this.nodesListManager.isValidNode(host)) {
|
if (!this.nodesListManager.isValidNode(host)) {
|
||||||
LOG.info("Disallowed NodeManager from " + host
|
String message =
|
||||||
+ ", Sending SHUTDOWN signal to the NodeManager.");
|
"Disallowed NodeManager from " + host
|
||||||
|
+ ", Sending SHUTDOWN signal to the NodeManager.";
|
||||||
|
LOG.info(message);
|
||||||
|
response.setDiagnosticsMessage(message);
|
||||||
response.setNodeAction(NodeAction.SHUTDOWN);
|
response.setNodeAction(NodeAction.SHUTDOWN);
|
||||||
return response;
|
return response;
|
||||||
}
|
}
|
||||||
|
@ -185,9 +188,12 @@ public class ResourceTrackerService extends AbstractService implements
|
||||||
// Check if this node has minimum allocations
|
// Check if this node has minimum allocations
|
||||||
if (capability.getMemory() < minAllocMb
|
if (capability.getMemory() < minAllocMb
|
||||||
|| capability.getVirtualCores() < minAllocVcores) {
|
|| capability.getVirtualCores() < minAllocVcores) {
|
||||||
LOG.info("NodeManager from " + host
|
String message =
|
||||||
+ " doesn't satisfy minimum allocations, Sending SHUTDOWN"
|
"NodeManager from " + host
|
||||||
+ " signal to the NodeManager.");
|
+ " doesn't satisfy minimum allocations, Sending SHUTDOWN"
|
||||||
|
+ " signal to the NodeManager.";
|
||||||
|
LOG.info(message);
|
||||||
|
response.setDiagnosticsMessage(message);
|
||||||
response.setNodeAction(NodeAction.SHUTDOWN);
|
response.setNodeAction(NodeAction.SHUTDOWN);
|
||||||
return response;
|
return response;
|
||||||
}
|
}
|
||||||
|
@ -214,10 +220,11 @@ public class ResourceTrackerService extends AbstractService implements
|
||||||
|
|
||||||
this.nmLivelinessMonitor.register(nodeId);
|
this.nmLivelinessMonitor.register(nodeId);
|
||||||
|
|
||||||
LOG.info("NodeManager from node " + host + "(cmPort: " + cmPort
|
String message =
|
||||||
+ " httpPort: " + httpPort + ") " + "registered with capability: "
|
"NodeManager from node " + host + "(cmPort: " + cmPort + " httpPort: "
|
||||||
+ capability + ", assigned nodeId " + nodeId);
|
+ httpPort + ") " + "registered with capability: " + capability
|
||||||
|
+ ", assigned nodeId " + nodeId;
|
||||||
|
LOG.info(message);
|
||||||
response.setNodeAction(NodeAction.NORMAL);
|
response.setNodeAction(NodeAction.NORMAL);
|
||||||
response.setRMIdentifier(ResourceManager.clusterTimeStamp);
|
response.setRMIdentifier(ResourceManager.clusterTimeStamp);
|
||||||
return response;
|
return response;
|
||||||
|
@ -243,7 +250,9 @@ public class ResourceTrackerService extends AbstractService implements
|
||||||
RMNode rmNode = this.rmContext.getRMNodes().get(nodeId);
|
RMNode rmNode = this.rmContext.getRMNodes().get(nodeId);
|
||||||
if (rmNode == null) {
|
if (rmNode == null) {
|
||||||
/* node does not exist */
|
/* node does not exist */
|
||||||
LOG.info("Node not found rebooting " + remoteNodeStatus.getNodeId());
|
String message = "Node not found rebooting " + remoteNodeStatus.getNodeId();
|
||||||
|
LOG.info(message);
|
||||||
|
resync.setDiagnosticsMessage(message);
|
||||||
return resync;
|
return resync;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -252,8 +261,11 @@ public class ResourceTrackerService extends AbstractService implements
|
||||||
|
|
||||||
// 2. Check if it's a valid (i.e. not excluded) node
|
// 2. Check if it's a valid (i.e. not excluded) node
|
||||||
if (!this.nodesListManager.isValidNode(rmNode.getHostName())) {
|
if (!this.nodesListManager.isValidNode(rmNode.getHostName())) {
|
||||||
LOG.info("Disallowed NodeManager nodeId: " + nodeId + " hostname: "
|
String message =
|
||||||
+ rmNode.getNodeAddress());
|
"Disallowed NodeManager nodeId: " + nodeId + " hostname: "
|
||||||
|
+ rmNode.getNodeAddress();
|
||||||
|
LOG.info(message);
|
||||||
|
shutDown.setDiagnosticsMessage(message);
|
||||||
this.rmContext.getDispatcher().getEventHandler().handle(
|
this.rmContext.getDispatcher().getEventHandler().handle(
|
||||||
new RMNodeEvent(nodeId, RMNodeEventType.DECOMMISSION));
|
new RMNodeEvent(nodeId, RMNodeEventType.DECOMMISSION));
|
||||||
return shutDown;
|
return shutDown;
|
||||||
|
@ -268,9 +280,12 @@ public class ResourceTrackerService extends AbstractService implements
|
||||||
return lastNodeHeartbeatResponse;
|
return lastNodeHeartbeatResponse;
|
||||||
} else if (remoteNodeStatus.getResponseId() + 1 < lastNodeHeartbeatResponse
|
} else if (remoteNodeStatus.getResponseId() + 1 < lastNodeHeartbeatResponse
|
||||||
.getResponseId()) {
|
.getResponseId()) {
|
||||||
LOG.info("Too far behind rm response id:"
|
String message =
|
||||||
+ lastNodeHeartbeatResponse.getResponseId() + " nm response id:"
|
"Too far behind rm response id:"
|
||||||
+ remoteNodeStatus.getResponseId());
|
+ lastNodeHeartbeatResponse.getResponseId() + " nm response id:"
|
||||||
|
+ remoteNodeStatus.getResponseId();
|
||||||
|
LOG.info(message);
|
||||||
|
resync.setDiagnosticsMessage(message);
|
||||||
// TODO: Just sending reboot is not enough. Think more.
|
// TODO: Just sending reboot is not enough. Think more.
|
||||||
this.rmContext.getDispatcher().getEventHandler().handle(
|
this.rmContext.getDispatcher().getEventHandler().handle(
|
||||||
new RMNodeEvent(nodeId, RMNodeEventType.REBOOTING));
|
new RMNodeEvent(nodeId, RMNodeEventType.REBOOTING));
|
||||||
|
|
|
@ -268,6 +268,10 @@ public class TestResourceTrackerService {
|
||||||
// trying to register a invalid node.
|
// trying to register a invalid node.
|
||||||
RegisterNodeManagerResponse response = resourceTrackerService.registerNodeManager(req);
|
RegisterNodeManagerResponse response = resourceTrackerService.registerNodeManager(req);
|
||||||
Assert.assertEquals(NodeAction.SHUTDOWN,response.getNodeAction());
|
Assert.assertEquals(NodeAction.SHUTDOWN,response.getNodeAction());
|
||||||
|
Assert
|
||||||
|
.assertEquals(
|
||||||
|
"Disallowed NodeManager from host2, Sending SHUTDOWN signal to the NodeManager.",
|
||||||
|
response.getDiagnosticsMessage());
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
|
@ -344,6 +348,8 @@ public class TestResourceTrackerService {
|
||||||
nodeHeartbeat = nm2.nodeHeartbeat(
|
nodeHeartbeat = nm2.nodeHeartbeat(
|
||||||
new HashMap<ApplicationId, List<ContainerStatus>>(), true, -100);
|
new HashMap<ApplicationId, List<ContainerStatus>>(), true, -100);
|
||||||
Assert.assertTrue(NodeAction.RESYNC.equals(nodeHeartbeat.getNodeAction()));
|
Assert.assertTrue(NodeAction.RESYNC.equals(nodeHeartbeat.getNodeAction()));
|
||||||
|
Assert.assertEquals("Too far behind rm response id:0 nm response id:-100",
|
||||||
|
nodeHeartbeat.getDiagnosticsMessage());
|
||||||
checkRebootedNMCount(rm, ++initialMetricCount);
|
checkRebootedNMCount(rm, ++initialMetricCount);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -133,5 +133,7 @@ public class TestRMNMRPCResponseId {
|
||||||
nodeStatus.setResponseId(0);
|
nodeStatus.setResponseId(0);
|
||||||
response = resourceTrackerService.nodeHeartbeat(nodeHeartBeatRequest);
|
response = resourceTrackerService.nodeHeartbeat(nodeHeartBeatRequest);
|
||||||
Assert.assertTrue(NodeAction.RESYNC.equals(response.getNodeAction()));
|
Assert.assertTrue(NodeAction.RESYNC.equals(response.getNodeAction()));
|
||||||
|
Assert.assertEquals("Too far behind rm response id:2 nm response id:0",
|
||||||
|
response.getDiagnosticsMessage());
|
||||||
}
|
}
|
||||||
}
|
}
|
Loading…
Reference in New Issue