YARN-10450. Add cpu and memory utilization per node and cluster-wide metrics.

Contributed by Jim Brennan.
This commit is contained in:
Eric Badger 2020-10-16 19:29:04 +00:00
parent 104dd85ad8
commit c4b42fa1ae
10 changed files with 158 additions and 6 deletions

View File

@ -30,6 +30,7 @@ import org.apache.hadoop.metrics2.annotation.Metrics;
import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem; import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem;
import org.apache.hadoop.metrics2.lib.MetricsRegistry; import org.apache.hadoop.metrics2.lib.MetricsRegistry;
import org.apache.hadoop.metrics2.lib.MutableGaugeInt; import org.apache.hadoop.metrics2.lib.MutableGaugeInt;
import org.apache.hadoop.metrics2.lib.MutableGaugeLong;
import org.apache.hadoop.metrics2.lib.MutableRate; import org.apache.hadoop.metrics2.lib.MutableRate;
import com.google.common.annotations.VisibleForTesting; import com.google.common.annotations.VisibleForTesting;
@ -48,6 +49,8 @@ public class ClusterMetrics {
@Metric("# of Shutdown NMs") MutableGaugeInt numShutdownNMs; @Metric("# of Shutdown NMs") MutableGaugeInt numShutdownNMs;
@Metric("AM container launch delay") MutableRate aMLaunchDelay; @Metric("AM container launch delay") MutableRate aMLaunchDelay;
@Metric("AM register delay") MutableRate aMRegisterDelay; @Metric("AM register delay") MutableRate aMRegisterDelay;
@Metric("Memory Utilization") MutableGaugeLong utilizedMB;
@Metric("Vcore Utilization") MutableGaugeLong utilizedVirtualCores;
private static final MetricsInfo RECORD_INFO = info("ClusterMetrics", private static final MetricsInfo RECORD_INFO = info("ClusterMetrics",
"Metrics for the Yarn Cluster"); "Metrics for the Yarn Cluster");
@ -190,4 +193,27 @@ public class ClusterMetrics {
aMRegisterDelay.add(delay); aMRegisterDelay.add(delay);
} }
public long getUtilizedMB() {
return utilizedMB.value();
}
public void incrUtilizedMB(long delta) {
utilizedMB.incr(delta);
}
public void decrUtilizedMB(long delta) {
utilizedMB.decr(delta);
}
public void decrUtilizedVirtualCores(long delta) {
utilizedVirtualCores.decr(delta);
}
public long getUtilizedVirtualCores() {
return utilizedVirtualCores.value();
}
public void incrUtilizedVirtualCores(long delta) {
utilizedVirtualCores.incr(delta);
}
} }

View File

@ -134,6 +134,9 @@ public class RMNodeImpl implements RMNode, EventHandler<RMNodeEvent> {
/* Resource utilization for the node. */ /* Resource utilization for the node. */
private ResourceUtilization nodeUtilization; private ResourceUtilization nodeUtilization;
/* Track last increment made to Utilization metrics*/
private Resource lastUtilIncr = Resources.none();
/** Physical resources in the node. */ /** Physical resources in the node. */
private volatile Resource physicalResource; private volatile Resource physicalResource;
@ -383,7 +386,8 @@ public class RMNodeImpl implements RMNode, EventHandler<RMNodeEvent> {
this.lastHealthReportTime = System.currentTimeMillis(); this.lastHealthReportTime = System.currentTimeMillis();
this.nodeManagerVersion = nodeManagerVersion; this.nodeManagerVersion = nodeManagerVersion;
this.timeStamp = 0; this.timeStamp = 0;
this.physicalResource = physResource; // If physicalResource is not available, capability is a reasonable guess
this.physicalResource = physResource==null ? capability : physResource;
this.latestNodeHeartBeatResponse.setResponseId(0); this.latestNodeHeartBeatResponse.setResponseId(0);
@ -522,6 +526,37 @@ public class RMNodeImpl implements RMNode, EventHandler<RMNodeEvent> {
} }
} }
private void clearContributionToUtilizationMetrics() {
ClusterMetrics metrics = ClusterMetrics.getMetrics();
metrics.decrUtilizedMB(lastUtilIncr.getMemorySize());
metrics.decrUtilizedVirtualCores(lastUtilIncr.getVirtualCores());
lastUtilIncr = Resources.none();
}
private void updateClusterUtilizationMetrics() {
// Update cluster utilization metrics
ClusterMetrics metrics = ClusterMetrics.getMetrics();
Resource prevIncr = lastUtilIncr;
if (this.nodeUtilization == null) {
lastUtilIncr = Resources.none();
} else {
/* Scale memory contribution based on configured node size */
long newmem = (long)((float)this.nodeUtilization.getPhysicalMemory()
/ Math.max(1.0f, this.getPhysicalResource().getMemorySize())
* this.getTotalCapability().getMemorySize());
lastUtilIncr =
Resource.newInstance(newmem,
(int) (this.nodeUtilization.getCPU()
/ Math.max(1.0f, this.getPhysicalResource().getVirtualCores())
* this.getTotalCapability().getVirtualCores()));
}
metrics.incrUtilizedMB(lastUtilIncr.getMemorySize() -
prevIncr.getMemorySize());
metrics.incrUtilizedVirtualCores(lastUtilIncr.getVirtualCores() -
prevIncr.getVirtualCores());
}
@Override @Override
public ResourceUtilization getNodeUtilization() { public ResourceUtilization getNodeUtilization() {
this.readLock.lock(); this.readLock.lock();
@ -680,6 +715,8 @@ public class RMNodeImpl implements RMNode, EventHandler<RMNodeEvent> {
private void updateMetricsForRejoinedNode(NodeState previousNodeState) { private void updateMetricsForRejoinedNode(NodeState previousNodeState) {
ClusterMetrics metrics = ClusterMetrics.getMetrics(); ClusterMetrics metrics = ClusterMetrics.getMetrics();
// Update utilization metrics
this.updateClusterUtilizationMetrics();
metrics.incrNumActiveNodes(); metrics.incrNumActiveNodes();
switch (previousNodeState) { switch (previousNodeState) {
@ -739,6 +776,8 @@ public class RMNodeImpl implements RMNode, EventHandler<RMNodeEvent> {
private void updateMetricsForDeactivatedNode(NodeState initialState, private void updateMetricsForDeactivatedNode(NodeState initialState,
NodeState finalState) { NodeState finalState) {
ClusterMetrics metrics = ClusterMetrics.getMetrics(); ClusterMetrics metrics = ClusterMetrics.getMetrics();
// Update utilization metrics
clearContributionToUtilizationMetrics();
switch (initialState) { switch (initialState) {
case RUNNING: case RUNNING:
@ -1188,6 +1227,7 @@ public class RMNodeImpl implements RMNode, EventHandler<RMNodeEvent> {
statusEvent.getOpportunisticContainersStatus()); statusEvent.getOpportunisticContainersStatus());
NodeHealthStatus remoteNodeHealthStatus = updateRMNodeFromStatusEvents( NodeHealthStatus remoteNodeHealthStatus = updateRMNodeFromStatusEvents(
rmNode, statusEvent); rmNode, statusEvent);
rmNode.updateClusterUtilizationMetrics();
NodeState initialState = rmNode.getState(); NodeState initialState = rmNode.getState();
boolean isNodeDecommissioning = boolean isNodeDecommissioning =
initialState.equals(NodeState.DECOMMISSIONING); initialState.equals(NodeState.DECOMMISSIONING);

View File

@ -21,6 +21,7 @@ package org.apache.hadoop.yarn.server.resourcemanager.scheduler;
import org.apache.hadoop.classification.InterfaceAudience.Private; import org.apache.hadoop.classification.InterfaceAudience.Private;
import org.apache.hadoop.classification.InterfaceStability.Stable; import org.apache.hadoop.classification.InterfaceStability.Stable;
import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.api.records.ResourceUtilization;
/** /**
* Node usage report. * Node usage report.
@ -30,12 +31,14 @@ import org.apache.hadoop.yarn.api.records.Resource;
public class SchedulerNodeReport { public class SchedulerNodeReport {
private final Resource used; private final Resource used;
private final Resource avail; private final Resource avail;
private final ResourceUtilization utilization;
private final int num; private final int num;
public SchedulerNodeReport(SchedulerNode node) { public SchedulerNodeReport(SchedulerNode node) {
this.used = node.getAllocatedResource(); this.used = node.getAllocatedResource();
this.avail = node.getUnallocatedResource(); this.avail = node.getUnallocatedResource();
this.num = node.getNumContainers(); this.num = node.getNumContainers();
this.utilization = node.getNodeUtilization();
} }
/** /**
@ -58,4 +61,12 @@ public class SchedulerNodeReport {
public int getNumContainers() { public int getNumContainers() {
return num; return num;
} }
/**
*
* @return utilization of this node
*/
public ResourceUtilization getUtilization() {
return utilization;
}
} }

View File

@ -105,6 +105,8 @@ public class MetricsOverviewTable extends HtmlBlock {
th().$class("ui-state-default")._("Used Resources")._(). th().$class("ui-state-default")._("Used Resources")._().
th().$class("ui-state-default")._("Total Resources")._(). th().$class("ui-state-default")._("Total Resources")._().
th().$class("ui-state-default")._("Reserved Resources")._(). th().$class("ui-state-default")._("Reserved Resources")._().
th().$class("ui-state-default")._("Physical Mem Used %")._().
th().$class("ui-state-default")._("Physical VCores Used %")._().
_(). _().
_(). _().
tbody().$class("ui-widget-content"). tbody().$class("ui-widget-content").
@ -122,6 +124,8 @@ public class MetricsOverviewTable extends HtmlBlock {
td(usedResources.toString()). td(usedResources.toString()).
td(totalResources.toString()). td(totalResources.toString()).
td(reservedResources.toString()). td(reservedResources.toString()).
td(String.valueOf(clusterMetrics.getUtilizedMBPercent())).
td(String.valueOf(clusterMetrics.getUtilizedVirtualCoresPercent())).
_(). _().
_()._(); _()._();

View File

@ -86,16 +86,20 @@ class NodesPage extends RmView {
trbody.th(".containers", "Containers") trbody.th(".containers", "Containers")
.th(".mem", "Mem Used") .th(".mem", "Mem Used")
.th(".mem", "Mem Avail") .th(".mem", "Mem Avail")
.th(".mem", "Phys Mem Used %")
.th(".vcores", "VCores Used") .th(".vcores", "VCores Used")
.th(".vcores", "VCores Avail") .th(".vcores", "VCores Avail")
.th(".vcores", "Phys VCores Used %")
.th(".gpus", "GPUs Used") .th(".gpus", "GPUs Used")
.th(".gpus", "GPUs Avail"); .th(".gpus", "GPUs Avail");
} else { } else {
trbody.th(".containers", "Running Containers (G)") trbody.th(".containers", "Running Containers (G)")
.th(".mem", "Mem Used (G)") .th(".mem", "Mem Used (G)")
.th(".mem", "Mem Avail (G)") .th(".mem", "Mem Avail (G)")
.th(".mem", "Phys Mem Used %")
.th(".vcores", "VCores Used (G)") .th(".vcores", "VCores Used (G)")
.th(".vcores", "VCores Avail (G)") .th(".vcores", "VCores Avail (G)")
.th(".vcores", "Phys VCores Used %")
.th(".gpus", "GPUs Used (G)") .th(".gpus", "GPUs Used (G)")
.th(".gpus", "GPUs Avail (G)") .th(".gpus", "GPUs Avail (G)")
.th(".containers", "Running Containers (O)") .th(".containers", "Running Containers (O)")
@ -190,10 +194,15 @@ class NodesPage extends RmView {
.append("\",\"").append("<br title='") .append("\",\"").append("<br title='")
.append(String.valueOf(availableMemory)).append("'>") .append(String.valueOf(availableMemory)).append("'>")
.append(StringUtils.byteDesc(availableMemory * BYTES_IN_MB)) .append(StringUtils.byteDesc(availableMemory * BYTES_IN_MB))
.append("\",\"").append(String.valueOf(info.getUsedVirtualCores())) .append("\",\"")
.append(String.valueOf((int) info.getMemUtilization()))
.append("\",\"")
.append(String.valueOf(info.getUsedVirtualCores()))
.append("\",\"") .append("\",\"")
.append(String.valueOf(info.getAvailableVirtualCores())) .append(String.valueOf(info.getAvailableVirtualCores()))
.append("\",\"") .append("\",\"")
.append(String.valueOf((int) info.getVcoreUtilization()))
.append("\",\"")
.append(String.valueOf(usedGPUs)) .append(String.valueOf(usedGPUs))
.append("\",\"") .append("\",\"")
.append(String.valueOf(availableGPUs)) .append(String.valueOf(availableGPUs))

View File

@ -53,6 +53,8 @@ public class ClusterMetricsInfo {
private long totalMB; private long totalMB;
private long totalVirtualCores; private long totalVirtualCores;
private int utilizedMBPercent;
private int utilizedVirtualCoresPercent;
private int totalNodes; private int totalNodes;
private int lostNodes; private int lostNodes;
private int unhealthyNodes; private int unhealthyNodes;
@ -130,6 +132,14 @@ public class ClusterMetricsInfo {
this.totalMB = availableMB + allocatedMB; this.totalMB = availableMB + allocatedMB;
this.totalVirtualCores = availableVirtualCores + allocatedVirtualCores; this.totalVirtualCores = availableVirtualCores + allocatedVirtualCores;
} }
long baseMem = this.totalMB;
this.utilizedMBPercent = baseMem <= 0 ? 0 :
(int) (clusterMetrics.getUtilizedMB() * 100 / baseMem);
long baseCores = this.totalVirtualCores;
this.utilizedVirtualCoresPercent = baseCores <= 0 ? 0 :
(int) (clusterMetrics.getUtilizedVirtualCores() * 100 /
baseCores);
this.activeNodes = clusterMetrics.getNumActiveNMs(); this.activeNodes = clusterMetrics.getNumActiveNMs();
this.lostNodes = clusterMetrics.getNumLostNMs(); this.lostNodes = clusterMetrics.getNumLostNMs();
this.unhealthyNodes = clusterMetrics.getUnhealthyNMs(); this.unhealthyNodes = clusterMetrics.getUnhealthyNMs();
@ -241,6 +251,14 @@ public class ClusterMetricsInfo {
return this.shutdownNodes; return this.shutdownNodes;
} }
public int getUtilizedMBPercent() {
return utilizedMBPercent;
}
public int getUtilizedVirtualCoresPercent() {
return utilizedVirtualCoresPercent;
}
public void setContainersReserved(int containersReserved) { public void setContainersReserved(int containersReserved) {
this.containersReserved = containersReserved; this.containersReserved = containersReserved;
} }
@ -345,6 +363,14 @@ public class ClusterMetricsInfo {
return totalUsedResourcesAcrossPartition; return totalUsedResourcesAcrossPartition;
} }
public void setUtilizedMBPercent(int utilizedMBPercent) {
this.utilizedMBPercent = utilizedMBPercent;
}
public void setUtilizedVirtualCoresPercent(int utilizedVirtualCoresPercent) {
this.utilizedVirtualCoresPercent = utilizedVirtualCoresPercent;
}
public ResourceInfo getTotalClusterResourcesAcrossPartition() { public ResourceInfo getTotalClusterResourcesAcrossPartition() {
return totalClusterResourcesAcrossPartition; return totalClusterResourcesAcrossPartition;
} }

View File

@ -28,6 +28,7 @@ import javax.xml.bind.annotation.XmlRootElement;
import org.apache.hadoop.yarn.api.records.NodeId; import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.hadoop.yarn.api.records.NodeState; import org.apache.hadoop.yarn.api.records.NodeState;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.server.api.records.OpportunisticContainersStatus; import org.apache.hadoop.yarn.server.api.records.OpportunisticContainersStatus;
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode; import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler;
@ -52,6 +53,8 @@ public class NodeInfo {
protected long availMemoryMB; protected long availMemoryMB;
protected long usedVirtualCores; protected long usedVirtualCores;
protected long availableVirtualCores; protected long availableVirtualCores;
private float memUtilization;
private float cpuUtilization;
private int numRunningOpportContainers; private int numRunningOpportContainers;
private long usedMemoryOpportGB; private long usedMemoryOpportGB;
private long usedVirtualCoresOpport; private long usedVirtualCoresOpport;
@ -79,6 +82,23 @@ public class NodeInfo {
report.getAvailableResource().getVirtualCores(); report.getAvailableResource().getVirtualCores();
this.usedResource = new ResourceInfo(report.getUsedResource()); this.usedResource = new ResourceInfo(report.getUsedResource());
this.availableResource = new ResourceInfo(report.getAvailableResource()); this.availableResource = new ResourceInfo(report.getAvailableResource());
Resource totalPhysical = ni.getPhysicalResource();
long nodeMem;
long nodeCores;
if (totalPhysical == null) {
nodeMem =
this.usedMemoryMB + this.availMemoryMB;
// If we don't know the number of physical cores, assume 1. Not
// accurate but better than nothing.
nodeCores = 1;
} else {
nodeMem = totalPhysical.getMemorySize();
nodeCores = totalPhysical.getVirtualCores();
}
this.memUtilization = nodeMem <= 0 ? 0
: (float)report.getUtilization().getPhysicalMemory() * 100F / nodeMem;
this.cpuUtilization =
(float)report.getUtilization().getCPU() * 100F / nodeCores;
} }
this.id = id.toString(); this.id = id.toString();
this.rack = ni.getRackName(); this.rack = ni.getRackName();
@ -207,6 +227,22 @@ public class NodeInfo {
return this.resourceUtilization; return this.resourceUtilization;
} }
public float getMemUtilization() {
return memUtilization;
}
public void setMemUtilization(float util) {
this.memUtilization = util;
}
public float getVcoreUtilization() {
return cpuUtilization;
}
public void setVcoreUtilization(float util) {
this.cpuUtilization = util;
}
@VisibleForTesting @VisibleForTesting
public void setId(String id) { public void setId(String id) {
this.id = id; this.id = id;

View File

@ -52,8 +52,8 @@ public class TestNodesPage {
// Number of Actual Table Headers for NodesPage.NodesBlock might change in // Number of Actual Table Headers for NodesPage.NodesBlock might change in
// future. In that case this value should be adjusted to the new value. // future. In that case this value should be adjusted to the new value.
final int numberOfThInMetricsTable = 20; final int numberOfThInMetricsTable = 22;
final int numberOfActualTableHeaders = 15; final int numberOfActualTableHeaders = 17;
private final int numberOfThForOpportunisticContainers = 4; private final int numberOfThForOpportunisticContainers = 4;
private Injector injector; private Injector injector;

View File

@ -433,7 +433,7 @@ public class TestRMWebServices extends JerseyTestBase {
Exception { Exception {
assertEquals("incorrect number of elements", 1, json.length()); assertEquals("incorrect number of elements", 1, json.length());
JSONObject clusterinfo = json.getJSONObject("clusterMetrics"); JSONObject clusterinfo = json.getJSONObject("clusterMetrics");
assertEquals("incorrect number of elements", 27, clusterinfo.length()); assertEquals("incorrect number of elements", 29, clusterinfo.length());
verifyClusterMetrics( verifyClusterMetrics(
clusterinfo.getInt("appsSubmitted"), clusterinfo.getInt("appsCompleted"), clusterinfo.getInt("appsSubmitted"), clusterinfo.getInt("appsCompleted"),
clusterinfo.getInt("reservedMB"), clusterinfo.getInt("availableMB"), clusterinfo.getInt("reservedMB"), clusterinfo.getInt("availableMB"),

View File

@ -713,7 +713,7 @@ public class TestRMWebServicesNodes extends JerseyTestBase {
public void verifyNodeInfo(JSONObject nodeInfo, RMNode nm) public void verifyNodeInfo(JSONObject nodeInfo, RMNode nm)
throws JSONException, Exception { throws JSONException, Exception {
assertEquals("incorrect number of elements", 18, nodeInfo.length()); assertEquals("incorrect number of elements", 20, nodeInfo.length());
JSONObject resourceInfo = nodeInfo.getJSONObject("resourceUtilization"); JSONObject resourceInfo = nodeInfo.getJSONObject("resourceUtilization");
verifyNodeInfoGeneric(nm, nodeInfo.getString("state"), verifyNodeInfoGeneric(nm, nodeInfo.getString("state"),