YARN-10450. Add cpu and memory utilization per node and cluster-wide metrics.
Contributed by Jim Brennan.
(cherry picked from commit 8b8c672780
)
This commit is contained in:
parent
d5b4d04b0d
commit
8abf939152
|
@ -30,6 +30,7 @@ import org.apache.hadoop.metrics2.annotation.Metrics;
|
|||
import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem;
|
||||
import org.apache.hadoop.metrics2.lib.MetricsRegistry;
|
||||
import org.apache.hadoop.metrics2.lib.MutableGaugeInt;
|
||||
import org.apache.hadoop.metrics2.lib.MutableGaugeLong;
|
||||
import org.apache.hadoop.metrics2.lib.MutableRate;
|
||||
import com.google.common.annotations.VisibleForTesting;
|
||||
|
||||
|
@ -50,6 +51,8 @@ public class ClusterMetrics {
|
|||
@Metric("AM register delay") MutableRate aMRegisterDelay;
|
||||
@Metric("AM container allocation delay")
|
||||
private MutableRate aMContainerAllocationDelay;
|
||||
@Metric("Memory Utilization") MutableGaugeLong utilizedMB;
|
||||
@Metric("Vcore Utilization") MutableGaugeLong utilizedVirtualCores;
|
||||
|
||||
private static final MetricsInfo RECORD_INFO = info("ClusterMetrics",
|
||||
"Metrics for the Yarn Cluster");
|
||||
|
@ -199,4 +202,28 @@ public class ClusterMetrics {
|
|||
public MutableRate getAMContainerAllocationDelay() {
|
||||
return aMContainerAllocationDelay;
|
||||
}
|
||||
|
||||
public long getUtilizedMB() {
|
||||
return utilizedMB.value();
|
||||
}
|
||||
|
||||
public void incrUtilizedMB(long delta) {
|
||||
utilizedMB.incr(delta);
|
||||
}
|
||||
|
||||
public void decrUtilizedMB(long delta) {
|
||||
utilizedMB.decr(delta);
|
||||
}
|
||||
|
||||
public void decrUtilizedVirtualCores(long delta) {
|
||||
utilizedVirtualCores.decr(delta);
|
||||
}
|
||||
|
||||
public long getUtilizedVirtualCores() {
|
||||
return utilizedVirtualCores.value();
|
||||
}
|
||||
|
||||
public void incrUtilizedVirtualCores(long delta) {
|
||||
utilizedVirtualCores.incr(delta);
|
||||
}
|
||||
}
|
||||
|
|
|
@ -141,6 +141,9 @@ public class RMNodeImpl implements RMNode, EventHandler<RMNodeEvent> {
|
|||
/* Resource utilization for the node. */
|
||||
private ResourceUtilization nodeUtilization;
|
||||
|
||||
/* Track last increment made to Utilization metrics*/
|
||||
private Resource lastUtilIncr = Resources.none();
|
||||
|
||||
/** Physical resources in the node. */
|
||||
private volatile Resource physicalResource;
|
||||
|
||||
|
@ -399,7 +402,8 @@ public class RMNodeImpl implements RMNode, EventHandler<RMNodeEvent> {
|
|||
this.lastHealthReportTime = System.currentTimeMillis();
|
||||
this.nodeManagerVersion = nodeManagerVersion;
|
||||
this.timeStamp = 0;
|
||||
this.physicalResource = physResource;
|
||||
// If physicalResource is not available, capability is a reasonable guess
|
||||
this.physicalResource = physResource==null ? capability : physResource;
|
||||
|
||||
this.latestNodeHeartBeatResponse.setResponseId(0);
|
||||
|
||||
|
@ -548,6 +552,37 @@ public class RMNodeImpl implements RMNode, EventHandler<RMNodeEvent> {
|
|||
}
|
||||
}
|
||||
|
||||
private void clearContributionToUtilizationMetrics() {
|
||||
ClusterMetrics metrics = ClusterMetrics.getMetrics();
|
||||
metrics.decrUtilizedMB(lastUtilIncr.getMemorySize());
|
||||
metrics.decrUtilizedVirtualCores(lastUtilIncr.getVirtualCores());
|
||||
lastUtilIncr = Resources.none();
|
||||
}
|
||||
|
||||
private void updateClusterUtilizationMetrics() {
|
||||
// Update cluster utilization metrics
|
||||
ClusterMetrics metrics = ClusterMetrics.getMetrics();
|
||||
Resource prevIncr = lastUtilIncr;
|
||||
|
||||
if (this.nodeUtilization == null) {
|
||||
lastUtilIncr = Resources.none();
|
||||
} else {
|
||||
/* Scale memory contribution based on configured node size */
|
||||
long newmem = (long)((float)this.nodeUtilization.getPhysicalMemory()
|
||||
/ Math.max(1.0f, this.getPhysicalResource().getMemorySize())
|
||||
* this.getTotalCapability().getMemorySize());
|
||||
lastUtilIncr =
|
||||
Resource.newInstance(newmem,
|
||||
(int) (this.nodeUtilization.getCPU()
|
||||
/ Math.max(1.0f, this.getPhysicalResource().getVirtualCores())
|
||||
* this.getTotalCapability().getVirtualCores()));
|
||||
}
|
||||
metrics.incrUtilizedMB(lastUtilIncr.getMemorySize() -
|
||||
prevIncr.getMemorySize());
|
||||
metrics.incrUtilizedVirtualCores(lastUtilIncr.getVirtualCores() -
|
||||
prevIncr.getVirtualCores());
|
||||
}
|
||||
|
||||
@Override
|
||||
public ResourceUtilization getNodeUtilization() {
|
||||
this.readLock.lock();
|
||||
|
@ -706,6 +741,8 @@ public class RMNodeImpl implements RMNode, EventHandler<RMNodeEvent> {
|
|||
|
||||
private void updateMetricsForRejoinedNode(NodeState previousNodeState) {
|
||||
ClusterMetrics metrics = ClusterMetrics.getMetrics();
|
||||
// Update utilization metrics
|
||||
this.updateClusterUtilizationMetrics();
|
||||
|
||||
switch (previousNodeState) {
|
||||
case LOST:
|
||||
|
@ -764,6 +801,8 @@ public class RMNodeImpl implements RMNode, EventHandler<RMNodeEvent> {
|
|||
private void updateMetricsForDeactivatedNode(NodeState initialState,
|
||||
NodeState finalState) {
|
||||
ClusterMetrics metrics = ClusterMetrics.getMetrics();
|
||||
// Update utilization metrics
|
||||
clearContributionToUtilizationMetrics();
|
||||
|
||||
switch (initialState) {
|
||||
case RUNNING:
|
||||
|
@ -1260,6 +1299,7 @@ public class RMNodeImpl implements RMNode, EventHandler<RMNodeEvent> {
|
|||
statusEvent.getOpportunisticContainersStatus());
|
||||
NodeHealthStatus remoteNodeHealthStatus = updateRMNodeFromStatusEvents(
|
||||
rmNode, statusEvent);
|
||||
rmNode.updateClusterUtilizationMetrics();
|
||||
NodeState initialState = rmNode.getState();
|
||||
boolean isNodeDecommissioning =
|
||||
initialState.equals(NodeState.DECOMMISSIONING);
|
||||
|
|
|
@ -21,6 +21,7 @@ package org.apache.hadoop.yarn.server.resourcemanager.scheduler;
|
|||
import org.apache.hadoop.classification.InterfaceAudience.Private;
|
||||
import org.apache.hadoop.classification.InterfaceStability.Stable;
|
||||
import org.apache.hadoop.yarn.api.records.Resource;
|
||||
import org.apache.hadoop.yarn.api.records.ResourceUtilization;
|
||||
|
||||
/**
|
||||
* Node usage report.
|
||||
|
@ -30,12 +31,14 @@ import org.apache.hadoop.yarn.api.records.Resource;
|
|||
public class SchedulerNodeReport {
|
||||
private final Resource used;
|
||||
private final Resource avail;
|
||||
private final ResourceUtilization utilization;
|
||||
private final int num;
|
||||
|
||||
public SchedulerNodeReport(SchedulerNode node) {
|
||||
this.used = node.getAllocatedResource();
|
||||
this.avail = node.getUnallocatedResource();
|
||||
this.num = node.getNumContainers();
|
||||
this.utilization = node.getNodeUtilization();
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -58,4 +61,12 @@ public class SchedulerNodeReport {
|
|||
public int getNumContainers() {
|
||||
return num;
|
||||
}
|
||||
|
||||
/**
|
||||
*
|
||||
* @return utilization of this node
|
||||
*/
|
||||
public ResourceUtilization getUtilization() {
|
||||
return utilization;
|
||||
}
|
||||
}
|
||||
|
|
|
@ -105,6 +105,8 @@ public class MetricsOverviewTable extends HtmlBlock {
|
|||
th().$class("ui-state-default").__("Used Resources").__().
|
||||
th().$class("ui-state-default").__("Total Resources").__().
|
||||
th().$class("ui-state-default").__("Reserved Resources").__().
|
||||
th().$class("ui-state-default").__("Physical Mem Used %").__().
|
||||
th().$class("ui-state-default").__("Physical VCores Used %").__().
|
||||
__().
|
||||
__().
|
||||
tbody().$class("ui-widget-content").
|
||||
|
@ -122,6 +124,8 @@ public class MetricsOverviewTable extends HtmlBlock {
|
|||
td(usedResources.toString()).
|
||||
td(totalResources.toString()).
|
||||
td(reservedResources.toString()).
|
||||
td(String.valueOf(clusterMetrics.getUtilizedMBPercent())).
|
||||
td(String.valueOf(clusterMetrics.getUtilizedVirtualCoresPercent())).
|
||||
__().
|
||||
__().__();
|
||||
|
||||
|
|
|
@ -87,8 +87,10 @@ class NodesPage extends RmView {
|
|||
.th(".allocationTags", "Allocation Tags")
|
||||
.th(".mem", "Mem Used")
|
||||
.th(".mem", "Mem Avail")
|
||||
.th(".mem", "Phys Mem Used %")
|
||||
.th(".vcores", "VCores Used")
|
||||
.th(".vcores", "VCores Avail")
|
||||
.th(".vcores", "Phys VCores Used %")
|
||||
.th(".gpus", "GPUs Used")
|
||||
.th(".gpus", "GPUs Avail");
|
||||
} else {
|
||||
|
@ -96,8 +98,10 @@ class NodesPage extends RmView {
|
|||
.th(".allocationTags", "Allocation Tags")
|
||||
.th(".mem", "Mem Used (G)")
|
||||
.th(".mem", "Mem Avail (G)")
|
||||
.th(".mem", "Phys Mem Used %")
|
||||
.th(".vcores", "VCores Used (G)")
|
||||
.th(".vcores", "VCores Avail (G)")
|
||||
.th(".vcores", "Phys VCores Used %")
|
||||
.th(".gpus", "GPUs Used (G)")
|
||||
.th(".gpus", "GPUs Avail (G)")
|
||||
.th(".containers", "Running Containers (O)")
|
||||
|
@ -193,10 +197,15 @@ class NodesPage extends RmView {
|
|||
.append("\",\"").append("<br title='")
|
||||
.append(String.valueOf(availableMemory)).append("'>")
|
||||
.append(StringUtils.byteDesc(availableMemory * BYTES_IN_MB))
|
||||
.append("\",\"").append(String.valueOf(info.getUsedVirtualCores()))
|
||||
.append("\",\"")
|
||||
.append(String.valueOf((int) info.getMemUtilization()))
|
||||
.append("\",\"")
|
||||
.append(String.valueOf(info.getUsedVirtualCores()))
|
||||
.append("\",\"")
|
||||
.append(String.valueOf(info.getAvailableVirtualCores()))
|
||||
.append("\",\"")
|
||||
.append(String.valueOf((int) info.getVcoreUtilization()))
|
||||
.append("\",\"")
|
||||
.append(String.valueOf(usedGPUs))
|
||||
.append("\",\"")
|
||||
.append(String.valueOf(availableGPUs))
|
||||
|
|
|
@ -55,6 +55,8 @@ public class ClusterMetricsInfo {
|
|||
|
||||
private long totalMB;
|
||||
private long totalVirtualCores;
|
||||
private int utilizedMBPercent;
|
||||
private int utilizedVirtualCoresPercent;
|
||||
private int totalNodes;
|
||||
private int lostNodes;
|
||||
private int unhealthyNodes;
|
||||
|
@ -134,6 +136,14 @@ public class ClusterMetricsInfo {
|
|||
this.totalMB = availableMB + allocatedMB;
|
||||
this.totalVirtualCores = availableVirtualCores + allocatedVirtualCores;
|
||||
}
|
||||
long baseMem = this.totalMB;
|
||||
this.utilizedMBPercent = baseMem <= 0 ? 0 :
|
||||
(int) (clusterMetrics.getUtilizedMB() * 100 / baseMem);
|
||||
long baseCores = this.totalVirtualCores;
|
||||
this.utilizedVirtualCoresPercent = baseCores <= 0 ? 0 :
|
||||
(int) (clusterMetrics.getUtilizedVirtualCores() * 100 /
|
||||
baseCores);
|
||||
|
||||
this.activeNodes = clusterMetrics.getNumActiveNMs();
|
||||
this.lostNodes = clusterMetrics.getNumLostNMs();
|
||||
this.unhealthyNodes = clusterMetrics.getUnhealthyNMs();
|
||||
|
@ -253,6 +263,14 @@ public class ClusterMetricsInfo {
|
|||
return this.shutdownNodes;
|
||||
}
|
||||
|
||||
public int getUtilizedMBPercent() {
|
||||
return utilizedMBPercent;
|
||||
}
|
||||
|
||||
public int getUtilizedVirtualCoresPercent() {
|
||||
return utilizedVirtualCoresPercent;
|
||||
}
|
||||
|
||||
public void setContainersReserved(int containersReserved) {
|
||||
this.containersReserved = containersReserved;
|
||||
}
|
||||
|
@ -357,6 +375,14 @@ public class ClusterMetricsInfo {
|
|||
return totalUsedResourcesAcrossPartition;
|
||||
}
|
||||
|
||||
public void setUtilizedMBPercent(int utilizedMBPercent) {
|
||||
this.utilizedMBPercent = utilizedMBPercent;
|
||||
}
|
||||
|
||||
public void setUtilizedVirtualCoresPercent(int utilizedVirtualCoresPercent) {
|
||||
this.utilizedVirtualCoresPercent = utilizedVirtualCoresPercent;
|
||||
}
|
||||
|
||||
public ResourceInfo getTotalClusterResourcesAcrossPartition() {
|
||||
return totalClusterResourcesAcrossPartition;
|
||||
}
|
||||
|
|
|
@ -30,6 +30,7 @@ import javax.xml.bind.annotation.XmlRootElement;
|
|||
import org.apache.hadoop.yarn.api.records.NodeAttribute;
|
||||
import org.apache.hadoop.yarn.api.records.NodeId;
|
||||
import org.apache.hadoop.yarn.api.records.NodeState;
|
||||
import org.apache.hadoop.yarn.api.records.Resource;
|
||||
import org.apache.hadoop.yarn.server.api.records.OpportunisticContainersStatus;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler;
|
||||
|
@ -54,6 +55,8 @@ public class NodeInfo {
|
|||
protected long availMemoryMB;
|
||||
protected long usedVirtualCores;
|
||||
protected long availableVirtualCores;
|
||||
private float memUtilization;
|
||||
private float cpuUtilization;
|
||||
private int numRunningOpportContainers;
|
||||
private long usedMemoryOpportGB;
|
||||
private long usedVirtualCoresOpport;
|
||||
|
@ -84,6 +87,23 @@ public class NodeInfo {
|
|||
report.getAvailableResource().getVirtualCores();
|
||||
this.usedResource = new ResourceInfo(report.getUsedResource());
|
||||
this.availableResource = new ResourceInfo(report.getAvailableResource());
|
||||
Resource totalPhysical = ni.getPhysicalResource();
|
||||
long nodeMem;
|
||||
long nodeCores;
|
||||
if (totalPhysical == null) {
|
||||
nodeMem =
|
||||
this.usedMemoryMB + this.availMemoryMB;
|
||||
// If we don't know the number of physical cores, assume 1. Not
|
||||
// accurate but better than nothing.
|
||||
nodeCores = 1;
|
||||
} else {
|
||||
nodeMem = totalPhysical.getMemorySize();
|
||||
nodeCores = totalPhysical.getVirtualCores();
|
||||
}
|
||||
this.memUtilization = nodeMem <= 0 ? 0
|
||||
: (float)report.getUtilization().getPhysicalMemory() * 100F / nodeMem;
|
||||
this.cpuUtilization =
|
||||
(float)report.getUtilization().getCPU() * 100F / nodeCores;
|
||||
}
|
||||
this.id = id.toString();
|
||||
this.rack = ni.getRackName();
|
||||
|
@ -229,6 +249,22 @@ public class NodeInfo {
|
|||
return this.resourceUtilization;
|
||||
}
|
||||
|
||||
public float getMemUtilization() {
|
||||
return memUtilization;
|
||||
}
|
||||
|
||||
public void setMemUtilization(float util) {
|
||||
this.memUtilization = util;
|
||||
}
|
||||
|
||||
public float getVcoreUtilization() {
|
||||
return cpuUtilization;
|
||||
}
|
||||
|
||||
public void setVcoreUtilization(float util) {
|
||||
this.cpuUtilization = util;
|
||||
}
|
||||
|
||||
public String getAllocationTagsSummary() {
|
||||
return this.allocationTags == null ? "" :
|
||||
this.allocationTags.toString();
|
||||
|
|
|
@ -52,8 +52,8 @@ public class TestNodesPage {
|
|||
|
||||
// Number of Actual Table Headers for NodesPage.NodesBlock might change in
|
||||
// future. In that case this value should be adjusted to the new value.
|
||||
private final int numberOfThInMetricsTable = 20;
|
||||
private final int numberOfActualTableHeaders = 16;
|
||||
private final int numberOfThInMetricsTable = 22;
|
||||
private final int numberOfActualTableHeaders = 18;
|
||||
private final int numberOfThForOpportunisticContainers = 4;
|
||||
|
||||
private Injector injector;
|
||||
|
|
|
@ -474,7 +474,7 @@ public class TestRMWebServices extends JerseyTestBase {
|
|||
Exception {
|
||||
assertEquals("incorrect number of elements", 1, json.length());
|
||||
JSONObject clusterinfo = json.getJSONObject("clusterMetrics");
|
||||
assertEquals("incorrect number of elements", 29, clusterinfo.length());
|
||||
assertEquals("incorrect number of elements", 31, clusterinfo.length());
|
||||
verifyClusterMetrics(
|
||||
clusterinfo.getInt("appsSubmitted"), clusterinfo.getInt("appsCompleted"),
|
||||
clusterinfo.getInt("reservedMB"), clusterinfo.getInt("availableMB"),
|
||||
|
|
|
@ -861,7 +861,7 @@ public class TestRMWebServicesNodes extends JerseyTestBase {
|
|||
|
||||
public void verifyNodeInfo(JSONObject nodeInfo, RMNode nm)
|
||||
throws JSONException, Exception {
|
||||
assertEquals("incorrect number of elements", 21, nodeInfo.length());
|
||||
assertEquals("incorrect number of elements", 23, nodeInfo.length());
|
||||
|
||||
JSONObject resourceInfo = nodeInfo.getJSONObject("resourceUtilization");
|
||||
verifyNodeInfoGeneric(nm, nodeInfo.getString("state"),
|
||||
|
|
Loading…
Reference in New Issue