YARN-4538. QueueMetrics pending cores and memory metrics wrong. (Bibin A Chundatt via wangda)

This commit is contained in:
Wangda Tan 2016-01-18 10:56:35 +08:00
parent 02f597c5db
commit 9523648d57
4 changed files with 60 additions and 13 deletions

View File

@ -1265,6 +1265,9 @@ Release 2.8.0 - UNRELEASED
YARN-4581. AHS writer thread leak makes RM crash while RM is recovering. YARN-4581. AHS writer thread leak makes RM crash while RM is recovering.
(sandflee via junping_du) (sandflee via junping_du)
YARN-4538. QueueMetrics pending cores and memory metrics wrong.
(Bibin A Chundatt via wangda)
Release 2.7.3 - UNRELEASED Release 2.7.3 - UNRELEASED
INCOMPATIBLE CHANGES INCOMPATIBLE CHANGES

View File

@ -526,8 +526,8 @@ public class AppSchedulingInfo {
} }
// Set queue metrics // Set queue metrics
queue.getMetrics().allocateResources(user, 0, queue.getMetrics().allocateResources(user,
increaseRequest.getDeltaCapacity(), true); increaseRequest.getDeltaCapacity());
// remove the increase request from pending increase request map // remove the increase request from pending increase request map
removeIncreaseRequest(nodeId, priority, containerId); removeIncreaseRequest(nodeId, priority, containerId);
@ -550,7 +550,7 @@ public class AppSchedulingInfo {
} }
// Set queue metrics // Set queue metrics
queue.getMetrics().releaseResources(user, 0, absDelta); queue.getMetrics().releaseResources(user, absDelta);
// update usage // update usage
appResourceUsage.decUsed(decreaseRequest.getNodePartition(), absDelta); appResourceUsage.decUsed(decreaseRequest.getNodePartition(), absDelta);

View File

@ -44,7 +44,6 @@ import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppState; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppState;
import org.apache.hadoop.yarn.server.utils.BuilderUtils; import org.apache.hadoop.yarn.server.utils.BuilderUtils;
import org.apache.hadoop.yarn.util.resource.Resources;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
@ -379,10 +378,9 @@ public class QueueMetrics implements MetricsSource {
} }
private void _decrPendingResources(int containers, Resource res) { private void _decrPendingResources(int containers, Resource res) {
// if #container = 0, means change container resource
pendingContainers.decr(containers); pendingContainers.decr(containers);
pendingMB.decr(res.getMemory() * Math.max(containers, 1)); pendingMB.decr(res.getMemory() * containers);
pendingVCores.decr(res.getVirtualCores() * Math.max(containers, 1)); pendingVCores.decr(res.getVirtualCores() * containers);
} }
public void incrNodeTypeAggregations(String user, NodeType type) { public void incrNodeTypeAggregations(String user, NodeType type) {
@ -406,12 +404,11 @@ public class QueueMetrics implements MetricsSource {
public void allocateResources(String user, int containers, Resource res, public void allocateResources(String user, int containers, Resource res,
boolean decrPending) { boolean decrPending) {
// if #containers = 0, means change container resource
allocatedContainers.incr(containers); allocatedContainers.incr(containers);
aggregateContainersAllocated.incr(containers); aggregateContainersAllocated.incr(containers);
allocatedMB.incr(res.getMemory() * Math.max(containers, 1)); allocatedMB.incr(res.getMemory() * containers);
allocatedVCores.incr(res.getVirtualCores() * Math.max(containers, 1)); allocatedVCores.incr(res.getVirtualCores() * containers);
if (decrPending) { if (decrPending) {
_decrPendingResources(containers, res); _decrPendingResources(containers, res);
} }
@ -424,12 +421,33 @@ public class QueueMetrics implements MetricsSource {
} }
} }
/**
* Allocate Resource for container size change.
*
* @param user
* @param res
*/
public void allocateResources(String user, Resource res) {
allocatedMB.incr(res.getMemory());
allocatedVCores.incr(res.getVirtualCores());
pendingMB.decr(res.getMemory());
pendingVCores.decr(res.getVirtualCores());
QueueMetrics userMetrics = getUserMetrics(user);
if (userMetrics != null) {
userMetrics.allocateResources(user, res);
}
if (parent != null) {
parent.allocateResources(user, res);
}
}
public void releaseResources(String user, int containers, Resource res) { public void releaseResources(String user, int containers, Resource res) {
// if #container = 0, means change container resource.
allocatedContainers.decr(containers); allocatedContainers.decr(containers);
aggregateContainersReleased.incr(containers); aggregateContainersReleased.incr(containers);
allocatedMB.decr(res.getMemory() * Math.max(containers, 1)); allocatedMB.decr(res.getMemory() * containers);
allocatedVCores.decr(res.getVirtualCores() * Math.max(containers, 1)); allocatedVCores.decr(res.getVirtualCores() * containers);
QueueMetrics userMetrics = getUserMetrics(user); QueueMetrics userMetrics = getUserMetrics(user);
if (userMetrics != null) { if (userMetrics != null) {
userMetrics.releaseResources(user, containers, res); userMetrics.releaseResources(user, containers, res);
@ -439,6 +457,24 @@ public class QueueMetrics implements MetricsSource {
} }
} }
/**
* Release Resource for container size change.
*
* @param user
* @param res
*/
public void releaseResources(String user, Resource res) {
allocatedMB.decr(res.getMemory());
allocatedVCores.decr(res.getVirtualCores());
QueueMetrics userMetrics = getUserMetrics(user);
if (userMetrics != null) {
userMetrics.releaseResources(user, res);
}
if (parent != null) {
parent.releaseResources(user, res);
}
}
public void reserveResource(String user, Resource res) { public void reserveResource(String user, Resource res) {
reservedContainers.incr(); reservedContainers.incr();
reservedMB.incr(res.getMemory()); reservedMB.incr(res.getMemory());

View File

@ -87,6 +87,14 @@ public class TestQueueMetrics {
metrics.releaseResources(user, 1, Resources.createResource(2*GB, 2)); metrics.releaseResources(user, 1, Resources.createResource(2*GB, 2));
checkResources(queueSource, 4*GB, 4, 2, 3, 1, 100*GB, 100, 9*GB, 9, 2, 0, 0, 0); checkResources(queueSource, 4*GB, 4, 2, 3, 1, 100*GB, 100, 9*GB, 9, 2, 0, 0, 0);
metrics.incrPendingResources(user, 0, Resources.createResource(2 * GB, 2));
checkResources(queueSource, 4 * GB, 4, 2, 3, 1, 100 * GB, 100, 9 * GB, 9, 2,
0, 0, 0);
metrics.decrPendingResources(user, 0, Resources.createResource(2 * GB, 2));
checkResources(queueSource, 4 * GB, 4, 2, 3, 1, 100 * GB, 100, 9 * GB, 9, 2,
0, 0, 0);
metrics.finishAppAttempt( metrics.finishAppAttempt(
app.getApplicationId(), app.isPending(), app.getUser()); app.getApplicationId(), app.isPending(), app.getUser());
checkApps(queueSource, 1, 0, 0, 0, 0, 0, true); checkApps(queueSource, 1, 0, 0, 0, 0, 0, true);