YARN-4055. Report node resource utilization in heartbeat. (Inigo Goiri via kasha)
(cherry picked from commit 13604bd5f1
)
This commit is contained in:
parent
cd0c6b5789
commit
b567aa2b4f
|
@ -117,6 +117,9 @@ Release 2.8.0 - UNRELEASED
|
|||
|
||||
YARN-3534. Collect memory/cpu usage on the node. (Inigo Goiri via kasha)
|
||||
|
||||
YARN-4055. Report node resource utilization in heartbeat.
|
||||
(Inigo Goiri via kasha)
|
||||
|
||||
IMPROVEMENTS
|
||||
|
||||
YARN-644. Basic null check is not performed on passed in arguments before
|
||||
|
|
|
@ -68,7 +68,7 @@ public class TestResourceTrackerOnHA extends ProtocolHATestBase{
|
|||
failoverThread = createAndStartFailoverThread();
|
||||
NodeStatus status =
|
||||
NodeStatus.newInstance(NodeId.newInstance("localhost", 0), 0, null,
|
||||
null, null, null);
|
||||
null, null, null, null);
|
||||
NodeHeartbeatRequest request2 =
|
||||
NodeHeartbeatRequest.newInstance(status, null, null,null);
|
||||
resourceTracker.nodeHeartbeat(request2);
|
||||
|
|
|
@ -47,13 +47,15 @@ public abstract class NodeStatus {
|
|||
* @param keepAliveApplications Applications to keep alive.
|
||||
* @param nodeHealthStatus Health status of the node.
|
||||
* @param containersUtilizations Utilization of the containers in this node.
|
||||
* @param nodeUtilization Utilization of the node.
|
||||
* @return New {@code NodeStatus} with the provided information.
|
||||
*/
|
||||
public static NodeStatus newInstance(NodeId nodeId, int responseId,
|
||||
List<ContainerStatus> containerStatuses,
|
||||
List<ApplicationId> keepAliveApplications,
|
||||
NodeHealthStatus nodeHealthStatus,
|
||||
ResourceUtilization containersUtilization) {
|
||||
ResourceUtilization containersUtilization,
|
||||
ResourceUtilization nodeUtilization) {
|
||||
NodeStatus nodeStatus = Records.newRecord(NodeStatus.class);
|
||||
nodeStatus.setResponseId(responseId);
|
||||
nodeStatus.setNodeId(nodeId);
|
||||
|
@ -61,6 +63,7 @@ public abstract class NodeStatus {
|
|||
nodeStatus.setKeepAliveApplications(keepAliveApplications);
|
||||
nodeStatus.setNodeHealthStatus(nodeHealthStatus);
|
||||
nodeStatus.setContainersUtilization(containersUtilization);
|
||||
nodeStatus.setNodeUtilization(nodeUtilization);
|
||||
return nodeStatus;
|
||||
}
|
||||
|
||||
|
@ -92,4 +95,17 @@ public abstract class NodeStatus {
|
|||
@Unstable
|
||||
public abstract void setContainersUtilization(
|
||||
ResourceUtilization containersUtilization);
|
||||
|
||||
/**
|
||||
* Get the <em>resource utilization</em> of the node.
|
||||
* @return <em>resource utilization</em> of the node
|
||||
*/
|
||||
@Public
|
||||
@Stable
|
||||
public abstract ResourceUtilization getNodeUtilization();
|
||||
|
||||
@Private
|
||||
@Unstable
|
||||
public abstract void setNodeUtilization(
|
||||
ResourceUtilization nodeUtilization);
|
||||
}
|
||||
|
|
|
@ -314,6 +314,28 @@ public class NodeStatusPBImpl extends NodeStatus {
|
|||
.setContainersUtilization(convertToProtoFormat(containersUtilization));
|
||||
}
|
||||
|
||||
@Override
|
||||
public ResourceUtilization getNodeUtilization() {
|
||||
NodeStatusProtoOrBuilder p =
|
||||
this.viaProto ? this.proto : this.builder;
|
||||
if (!p.hasNodeUtilization()) {
|
||||
return null;
|
||||
}
|
||||
return convertFromProtoFormat(p.getNodeUtilization());
|
||||
}
|
||||
|
||||
@Override
|
||||
public void setNodeUtilization(
|
||||
ResourceUtilization nodeUtilization) {
|
||||
maybeInitBuilder();
|
||||
if (nodeUtilization == null) {
|
||||
this.builder.clearNodeUtilization();
|
||||
return;
|
||||
}
|
||||
this.builder
|
||||
.setNodeUtilization(convertToProtoFormat(nodeUtilization));
|
||||
}
|
||||
|
||||
private NodeIdProto convertToProtoFormat(NodeId nodeId) {
|
||||
return ((NodeIdPBImpl)nodeId).getProto();
|
||||
}
|
||||
|
|
|
@ -37,6 +37,7 @@ message NodeStatusProto {
|
|||
optional NodeHealthStatusProto nodeHealthStatus = 4;
|
||||
repeated ApplicationIdProto keep_alive_applications = 5;
|
||||
optional ResourceUtilizationProto containers_utilization = 6;
|
||||
optional ResourceUtilizationProto node_utilization = 7;
|
||||
}
|
||||
|
||||
message MasterKeyProto {
|
||||
|
|
|
@ -70,6 +70,8 @@ public interface Context {
|
|||
|
||||
ContainerManagementProtocol getContainerManager();
|
||||
|
||||
NodeResourceMonitor getNodeResourceMonitor();
|
||||
|
||||
LocalDirsHandlerService getLocalDirsHandler();
|
||||
|
||||
ApplicationACLsManager getApplicationACLsManager();
|
||||
|
|
|
@ -93,7 +93,8 @@ public class NodeManager extends CompositeService
|
|||
private AsyncDispatcher dispatcher;
|
||||
private ContainerManagerImpl containerManager;
|
||||
private NodeStatusUpdater nodeStatusUpdater;
|
||||
private static CompositeServiceShutdownHook nodeManagerShutdownHook;
|
||||
private NodeResourceMonitor nodeResourceMonitor;
|
||||
private static CompositeServiceShutdownHook nodeManagerShutdownHook;
|
||||
private NMStateStoreService nmStore = null;
|
||||
|
||||
private AtomicBoolean isStopping = new AtomicBoolean(false);
|
||||
|
@ -292,8 +293,9 @@ public class NodeManager extends CompositeService
|
|||
nodeLabelsProvider);
|
||||
}
|
||||
|
||||
NodeResourceMonitor nodeResourceMonitor = createNodeResourceMonitor();
|
||||
nodeResourceMonitor = createNodeResourceMonitor();
|
||||
addService(nodeResourceMonitor);
|
||||
((NMContext) context).setNodeResourceMonitor(nodeResourceMonitor);
|
||||
|
||||
containerManager =
|
||||
createContainerManager(context, exec, del, nodeStatusUpdater,
|
||||
|
@ -413,6 +415,7 @@ public class NodeManager extends CompositeService
|
|||
private final NMContainerTokenSecretManager containerTokenSecretManager;
|
||||
private final NMTokenSecretManagerInNM nmTokenSecretManager;
|
||||
private ContainerManagementProtocol containerManager;
|
||||
private NodeResourceMonitor nodeResourceMonitor;
|
||||
private final LocalDirsHandlerService dirsHandler;
|
||||
private final ApplicationACLsManager aclsManager;
|
||||
private WebServer webServer;
|
||||
|
@ -477,6 +480,15 @@ public class NodeManager extends CompositeService
|
|||
return this.nodeHealthStatus;
|
||||
}
|
||||
|
||||
@Override
|
||||
public NodeResourceMonitor getNodeResourceMonitor() {
|
||||
return this.nodeResourceMonitor;
|
||||
}
|
||||
|
||||
public void setNodeResourceMonitor(NodeResourceMonitor nodeResourceMonitor) {
|
||||
this.nodeResourceMonitor = nodeResourceMonitor;
|
||||
}
|
||||
|
||||
@Override
|
||||
public ContainerManagementProtocol getContainerManager() {
|
||||
return this.containerManager;
|
||||
|
|
|
@ -431,10 +431,11 @@ public class NodeStatusUpdaterImpl extends AbstractService implements
|
|||
}
|
||||
List<ContainerStatus> containersStatuses = getContainerStatuses();
|
||||
ResourceUtilization containersUtilization = getContainersUtilization();
|
||||
ResourceUtilization nodeUtilization = getNodeUtilization();
|
||||
NodeStatus nodeStatus =
|
||||
NodeStatus.newInstance(nodeId, responseId, containersStatuses,
|
||||
createKeepAliveApplicationList(), nodeHealthStatus,
|
||||
containersUtilization);
|
||||
containersUtilization, nodeUtilization);
|
||||
|
||||
return nodeStatus;
|
||||
}
|
||||
|
@ -451,6 +452,16 @@ public class NodeStatusUpdaterImpl extends AbstractService implements
|
|||
return containersMonitor.getContainersUtilization();
|
||||
}
|
||||
|
||||
/**
|
||||
* Get the utilization of the node. This includes the containers.
|
||||
* @return Resource utilization of the node.
|
||||
*/
|
||||
private ResourceUtilization getNodeUtilization() {
|
||||
NodeResourceMonitorImpl nodeResourceMonitor =
|
||||
(NodeResourceMonitorImpl) this.context.getNodeResourceMonitor();
|
||||
return nodeResourceMonitor.getUtilization();
|
||||
}
|
||||
|
||||
// Iterate through the NMContext and clone and get all the containers'
|
||||
// statuses. If it's a completed container, add into the
|
||||
// recentlyStoppedContainers collections.
|
||||
|
|
Loading…
Reference in New Issue