YARN-4055. Report node resource utilization in heartbeat. (Inigo Goiri via kasha)
This commit is contained in:
parent
def12933b3
commit
13604bd5f1
|
@ -172,6 +172,9 @@ Release 2.8.0 - UNRELEASED
|
||||||
|
|
||||||
YARN-3534. Collect memory/cpu usage on the node. (Inigo Goiri via kasha)
|
YARN-3534. Collect memory/cpu usage on the node. (Inigo Goiri via kasha)
|
||||||
|
|
||||||
|
YARN-4055. Report node resource utilization in heartbeat.
|
||||||
|
(Inigo Goiri via kasha)
|
||||||
|
|
||||||
IMPROVEMENTS
|
IMPROVEMENTS
|
||||||
|
|
||||||
YARN-644. Basic null check is not performed on passed in arguments before
|
YARN-644. Basic null check is not performed on passed in arguments before
|
||||||
|
|
|
@ -68,7 +68,7 @@ public class TestResourceTrackerOnHA extends ProtocolHATestBase{
|
||||||
failoverThread = createAndStartFailoverThread();
|
failoverThread = createAndStartFailoverThread();
|
||||||
NodeStatus status =
|
NodeStatus status =
|
||||||
NodeStatus.newInstance(NodeId.newInstance("localhost", 0), 0, null,
|
NodeStatus.newInstance(NodeId.newInstance("localhost", 0), 0, null,
|
||||||
null, null, null);
|
null, null, null, null);
|
||||||
NodeHeartbeatRequest request2 =
|
NodeHeartbeatRequest request2 =
|
||||||
NodeHeartbeatRequest.newInstance(status, null, null,null);
|
NodeHeartbeatRequest.newInstance(status, null, null,null);
|
||||||
resourceTracker.nodeHeartbeat(request2);
|
resourceTracker.nodeHeartbeat(request2);
|
||||||
|
|
|
@ -47,13 +47,15 @@ public abstract class NodeStatus {
|
||||||
* @param keepAliveApplications Applications to keep alive.
|
* @param keepAliveApplications Applications to keep alive.
|
||||||
* @param nodeHealthStatus Health status of the node.
|
* @param nodeHealthStatus Health status of the node.
|
||||||
* @param containersUtilizations Utilization of the containers in this node.
|
* @param containersUtilizations Utilization of the containers in this node.
|
||||||
|
* @param nodeUtilization Utilization of the node.
|
||||||
* @return New {@code NodeStatus} with the provided information.
|
* @return New {@code NodeStatus} with the provided information.
|
||||||
*/
|
*/
|
||||||
public static NodeStatus newInstance(NodeId nodeId, int responseId,
|
public static NodeStatus newInstance(NodeId nodeId, int responseId,
|
||||||
List<ContainerStatus> containerStatuses,
|
List<ContainerStatus> containerStatuses,
|
||||||
List<ApplicationId> keepAliveApplications,
|
List<ApplicationId> keepAliveApplications,
|
||||||
NodeHealthStatus nodeHealthStatus,
|
NodeHealthStatus nodeHealthStatus,
|
||||||
ResourceUtilization containersUtilization) {
|
ResourceUtilization containersUtilization,
|
||||||
|
ResourceUtilization nodeUtilization) {
|
||||||
NodeStatus nodeStatus = Records.newRecord(NodeStatus.class);
|
NodeStatus nodeStatus = Records.newRecord(NodeStatus.class);
|
||||||
nodeStatus.setResponseId(responseId);
|
nodeStatus.setResponseId(responseId);
|
||||||
nodeStatus.setNodeId(nodeId);
|
nodeStatus.setNodeId(nodeId);
|
||||||
|
@ -61,6 +63,7 @@ public abstract class NodeStatus {
|
||||||
nodeStatus.setKeepAliveApplications(keepAliveApplications);
|
nodeStatus.setKeepAliveApplications(keepAliveApplications);
|
||||||
nodeStatus.setNodeHealthStatus(nodeHealthStatus);
|
nodeStatus.setNodeHealthStatus(nodeHealthStatus);
|
||||||
nodeStatus.setContainersUtilization(containersUtilization);
|
nodeStatus.setContainersUtilization(containersUtilization);
|
||||||
|
nodeStatus.setNodeUtilization(nodeUtilization);
|
||||||
return nodeStatus;
|
return nodeStatus;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -92,4 +95,17 @@ public abstract class NodeStatus {
|
||||||
@Unstable
|
@Unstable
|
||||||
public abstract void setContainersUtilization(
|
public abstract void setContainersUtilization(
|
||||||
ResourceUtilization containersUtilization);
|
ResourceUtilization containersUtilization);
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Get the <em>resource utilization</em> of the node.
|
||||||
|
* @return <em>resource utilization</em> of the node
|
||||||
|
*/
|
||||||
|
@Public
|
||||||
|
@Stable
|
||||||
|
public abstract ResourceUtilization getNodeUtilization();
|
||||||
|
|
||||||
|
@Private
|
||||||
|
@Unstable
|
||||||
|
public abstract void setNodeUtilization(
|
||||||
|
ResourceUtilization nodeUtilization);
|
||||||
}
|
}
|
||||||
|
|
|
@ -314,6 +314,28 @@ public class NodeStatusPBImpl extends NodeStatus {
|
||||||
.setContainersUtilization(convertToProtoFormat(containersUtilization));
|
.setContainersUtilization(convertToProtoFormat(containersUtilization));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public ResourceUtilization getNodeUtilization() {
|
||||||
|
NodeStatusProtoOrBuilder p =
|
||||||
|
this.viaProto ? this.proto : this.builder;
|
||||||
|
if (!p.hasNodeUtilization()) {
|
||||||
|
return null;
|
||||||
|
}
|
||||||
|
return convertFromProtoFormat(p.getNodeUtilization());
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void setNodeUtilization(
|
||||||
|
ResourceUtilization nodeUtilization) {
|
||||||
|
maybeInitBuilder();
|
||||||
|
if (nodeUtilization == null) {
|
||||||
|
this.builder.clearNodeUtilization();
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
this.builder
|
||||||
|
.setNodeUtilization(convertToProtoFormat(nodeUtilization));
|
||||||
|
}
|
||||||
|
|
||||||
private NodeIdProto convertToProtoFormat(NodeId nodeId) {
|
private NodeIdProto convertToProtoFormat(NodeId nodeId) {
|
||||||
return ((NodeIdPBImpl)nodeId).getProto();
|
return ((NodeIdPBImpl)nodeId).getProto();
|
||||||
}
|
}
|
||||||
|
|
|
@ -37,6 +37,7 @@ message NodeStatusProto {
|
||||||
optional NodeHealthStatusProto nodeHealthStatus = 4;
|
optional NodeHealthStatusProto nodeHealthStatus = 4;
|
||||||
repeated ApplicationIdProto keep_alive_applications = 5;
|
repeated ApplicationIdProto keep_alive_applications = 5;
|
||||||
optional ResourceUtilizationProto containers_utilization = 6;
|
optional ResourceUtilizationProto containers_utilization = 6;
|
||||||
|
optional ResourceUtilizationProto node_utilization = 7;
|
||||||
}
|
}
|
||||||
|
|
||||||
message MasterKeyProto {
|
message MasterKeyProto {
|
||||||
|
|
|
@ -70,6 +70,8 @@ public interface Context {
|
||||||
|
|
||||||
ContainerManagementProtocol getContainerManager();
|
ContainerManagementProtocol getContainerManager();
|
||||||
|
|
||||||
|
NodeResourceMonitor getNodeResourceMonitor();
|
||||||
|
|
||||||
LocalDirsHandlerService getLocalDirsHandler();
|
LocalDirsHandlerService getLocalDirsHandler();
|
||||||
|
|
||||||
ApplicationACLsManager getApplicationACLsManager();
|
ApplicationACLsManager getApplicationACLsManager();
|
||||||
|
|
|
@ -93,7 +93,8 @@ public class NodeManager extends CompositeService
|
||||||
private AsyncDispatcher dispatcher;
|
private AsyncDispatcher dispatcher;
|
||||||
private ContainerManagerImpl containerManager;
|
private ContainerManagerImpl containerManager;
|
||||||
private NodeStatusUpdater nodeStatusUpdater;
|
private NodeStatusUpdater nodeStatusUpdater;
|
||||||
private static CompositeServiceShutdownHook nodeManagerShutdownHook;
|
private NodeResourceMonitor nodeResourceMonitor;
|
||||||
|
private static CompositeServiceShutdownHook nodeManagerShutdownHook;
|
||||||
private NMStateStoreService nmStore = null;
|
private NMStateStoreService nmStore = null;
|
||||||
|
|
||||||
private AtomicBoolean isStopping = new AtomicBoolean(false);
|
private AtomicBoolean isStopping = new AtomicBoolean(false);
|
||||||
|
@ -292,8 +293,9 @@ public class NodeManager extends CompositeService
|
||||||
nodeLabelsProvider);
|
nodeLabelsProvider);
|
||||||
}
|
}
|
||||||
|
|
||||||
NodeResourceMonitor nodeResourceMonitor = createNodeResourceMonitor();
|
nodeResourceMonitor = createNodeResourceMonitor();
|
||||||
addService(nodeResourceMonitor);
|
addService(nodeResourceMonitor);
|
||||||
|
((NMContext) context).setNodeResourceMonitor(nodeResourceMonitor);
|
||||||
|
|
||||||
containerManager =
|
containerManager =
|
||||||
createContainerManager(context, exec, del, nodeStatusUpdater,
|
createContainerManager(context, exec, del, nodeStatusUpdater,
|
||||||
|
@ -413,6 +415,7 @@ public class NodeManager extends CompositeService
|
||||||
private final NMContainerTokenSecretManager containerTokenSecretManager;
|
private final NMContainerTokenSecretManager containerTokenSecretManager;
|
||||||
private final NMTokenSecretManagerInNM nmTokenSecretManager;
|
private final NMTokenSecretManagerInNM nmTokenSecretManager;
|
||||||
private ContainerManagementProtocol containerManager;
|
private ContainerManagementProtocol containerManager;
|
||||||
|
private NodeResourceMonitor nodeResourceMonitor;
|
||||||
private final LocalDirsHandlerService dirsHandler;
|
private final LocalDirsHandlerService dirsHandler;
|
||||||
private final ApplicationACLsManager aclsManager;
|
private final ApplicationACLsManager aclsManager;
|
||||||
private WebServer webServer;
|
private WebServer webServer;
|
||||||
|
@ -477,6 +480,15 @@ public class NodeManager extends CompositeService
|
||||||
return this.nodeHealthStatus;
|
return this.nodeHealthStatus;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public NodeResourceMonitor getNodeResourceMonitor() {
|
||||||
|
return this.nodeResourceMonitor;
|
||||||
|
}
|
||||||
|
|
||||||
|
public void setNodeResourceMonitor(NodeResourceMonitor nodeResourceMonitor) {
|
||||||
|
this.nodeResourceMonitor = nodeResourceMonitor;
|
||||||
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public ContainerManagementProtocol getContainerManager() {
|
public ContainerManagementProtocol getContainerManager() {
|
||||||
return this.containerManager;
|
return this.containerManager;
|
||||||
|
|
|
@ -431,10 +431,11 @@ public class NodeStatusUpdaterImpl extends AbstractService implements
|
||||||
}
|
}
|
||||||
List<ContainerStatus> containersStatuses = getContainerStatuses();
|
List<ContainerStatus> containersStatuses = getContainerStatuses();
|
||||||
ResourceUtilization containersUtilization = getContainersUtilization();
|
ResourceUtilization containersUtilization = getContainersUtilization();
|
||||||
|
ResourceUtilization nodeUtilization = getNodeUtilization();
|
||||||
NodeStatus nodeStatus =
|
NodeStatus nodeStatus =
|
||||||
NodeStatus.newInstance(nodeId, responseId, containersStatuses,
|
NodeStatus.newInstance(nodeId, responseId, containersStatuses,
|
||||||
createKeepAliveApplicationList(), nodeHealthStatus,
|
createKeepAliveApplicationList(), nodeHealthStatus,
|
||||||
containersUtilization);
|
containersUtilization, nodeUtilization);
|
||||||
|
|
||||||
return nodeStatus;
|
return nodeStatus;
|
||||||
}
|
}
|
||||||
|
@ -451,6 +452,16 @@ public class NodeStatusUpdaterImpl extends AbstractService implements
|
||||||
return containersMonitor.getContainersUtilization();
|
return containersMonitor.getContainersUtilization();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Get the utilization of the node. This includes the containers.
|
||||||
|
* @return Resource utilization of the node.
|
||||||
|
*/
|
||||||
|
private ResourceUtilization getNodeUtilization() {
|
||||||
|
NodeResourceMonitorImpl nodeResourceMonitor =
|
||||||
|
(NodeResourceMonitorImpl) this.context.getNodeResourceMonitor();
|
||||||
|
return nodeResourceMonitor.getUtilization();
|
||||||
|
}
|
||||||
|
|
||||||
// Iterate through the NMContext and clone and get all the containers'
|
// Iterate through the NMContext and clone and get all the containers'
|
||||||
// statuses. If it's a completed container, add into the
|
// statuses. If it's a completed container, add into the
|
||||||
// recentlyStoppedContainers collections.
|
// recentlyStoppedContainers collections.
|
||||||
|
|
Loading…
Reference in New Issue