YARN-4538. QueueMetrics pending cores and memory metrics wrong. (Bibin A Chundatt via wangda)
(cherry picked from commit 9523648d57
)
This commit is contained in:
parent
4a30a44b11
commit
c1193b46d4
|
@ -1210,6 +1210,9 @@ Release 2.8.0 - UNRELEASED
|
|||
YARN-4581. AHS writer thread leak makes RM crash while RM is recovering.
|
||||
(sandflee via junping_du)
|
||||
|
||||
YARN-4538. QueueMetrics pending cores and memory metrics wrong.
|
||||
(Bibin A Chundatt via wangda)
|
||||
|
||||
Release 2.7.3 - UNRELEASED
|
||||
|
||||
INCOMPATIBLE CHANGES
|
||||
|
|
|
@ -526,8 +526,8 @@ public class AppSchedulingInfo {
|
|||
}
|
||||
|
||||
// Set queue metrics
|
||||
queue.getMetrics().allocateResources(user, 0,
|
||||
increaseRequest.getDeltaCapacity(), true);
|
||||
queue.getMetrics().allocateResources(user,
|
||||
increaseRequest.getDeltaCapacity());
|
||||
|
||||
// remove the increase request from pending increase request map
|
||||
removeIncreaseRequest(nodeId, priority, containerId);
|
||||
|
@ -550,7 +550,7 @@ public class AppSchedulingInfo {
|
|||
}
|
||||
|
||||
// Set queue metrics
|
||||
queue.getMetrics().releaseResources(user, 0, absDelta);
|
||||
queue.getMetrics().releaseResources(user, absDelta);
|
||||
|
||||
// update usage
|
||||
appResourceUsage.decUsed(decreaseRequest.getNodePartition(), absDelta);
|
||||
|
|
|
@ -44,7 +44,6 @@ import org.apache.hadoop.yarn.api.records.Resource;
|
|||
import org.apache.hadoop.yarn.conf.YarnConfiguration;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppState;
|
||||
import org.apache.hadoop.yarn.server.utils.BuilderUtils;
|
||||
import org.apache.hadoop.yarn.util.resource.Resources;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
|
@ -379,10 +378,9 @@ public class QueueMetrics implements MetricsSource {
|
|||
}
|
||||
|
||||
private void _decrPendingResources(int containers, Resource res) {
|
||||
// if #container = 0, means change container resource
|
||||
pendingContainers.decr(containers);
|
||||
pendingMB.decr(res.getMemory() * Math.max(containers, 1));
|
||||
pendingVCores.decr(res.getVirtualCores() * Math.max(containers, 1));
|
||||
pendingMB.decr(res.getMemory() * containers);
|
||||
pendingVCores.decr(res.getVirtualCores() * containers);
|
||||
}
|
||||
|
||||
public void incrNodeTypeAggregations(String user, NodeType type) {
|
||||
|
@ -406,12 +404,11 @@ public class QueueMetrics implements MetricsSource {
|
|||
|
||||
public void allocateResources(String user, int containers, Resource res,
|
||||
boolean decrPending) {
|
||||
// if #containers = 0, means change container resource
|
||||
allocatedContainers.incr(containers);
|
||||
aggregateContainersAllocated.incr(containers);
|
||||
|
||||
allocatedMB.incr(res.getMemory() * Math.max(containers, 1));
|
||||
allocatedVCores.incr(res.getVirtualCores() * Math.max(containers, 1));
|
||||
allocatedMB.incr(res.getMemory() * containers);
|
||||
allocatedVCores.incr(res.getVirtualCores() * containers);
|
||||
if (decrPending) {
|
||||
_decrPendingResources(containers, res);
|
||||
}
|
||||
|
@ -424,12 +421,33 @@ public class QueueMetrics implements MetricsSource {
|
|||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Allocate Resource for container size change.
|
||||
*
|
||||
* @param user
|
||||
* @param res
|
||||
*/
|
||||
public void allocateResources(String user, Resource res) {
|
||||
allocatedMB.incr(res.getMemory());
|
||||
allocatedVCores.incr(res.getVirtualCores());
|
||||
|
||||
pendingMB.decr(res.getMemory());
|
||||
pendingVCores.decr(res.getVirtualCores());
|
||||
|
||||
QueueMetrics userMetrics = getUserMetrics(user);
|
||||
if (userMetrics != null) {
|
||||
userMetrics.allocateResources(user, res);
|
||||
}
|
||||
if (parent != null) {
|
||||
parent.allocateResources(user, res);
|
||||
}
|
||||
}
|
||||
|
||||
public void releaseResources(String user, int containers, Resource res) {
|
||||
// if #container = 0, means change container resource.
|
||||
allocatedContainers.decr(containers);
|
||||
aggregateContainersReleased.incr(containers);
|
||||
allocatedMB.decr(res.getMemory() * Math.max(containers, 1));
|
||||
allocatedVCores.decr(res.getVirtualCores() * Math.max(containers, 1));
|
||||
allocatedMB.decr(res.getMemory() * containers);
|
||||
allocatedVCores.decr(res.getVirtualCores() * containers);
|
||||
QueueMetrics userMetrics = getUserMetrics(user);
|
||||
if (userMetrics != null) {
|
||||
userMetrics.releaseResources(user, containers, res);
|
||||
|
@ -439,6 +457,24 @@ public class QueueMetrics implements MetricsSource {
|
|||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Release Resource for container size change.
|
||||
*
|
||||
* @param user
|
||||
* @param res
|
||||
*/
|
||||
public void releaseResources(String user, Resource res) {
|
||||
allocatedMB.decr(res.getMemory());
|
||||
allocatedVCores.decr(res.getVirtualCores());
|
||||
QueueMetrics userMetrics = getUserMetrics(user);
|
||||
if (userMetrics != null) {
|
||||
userMetrics.releaseResources(user, res);
|
||||
}
|
||||
if (parent != null) {
|
||||
parent.releaseResources(user, res);
|
||||
}
|
||||
}
|
||||
|
||||
public void reserveResource(String user, Resource res) {
|
||||
reservedContainers.incr();
|
||||
reservedMB.incr(res.getMemory());
|
||||
|
|
|
@ -87,6 +87,14 @@ public class TestQueueMetrics {
|
|||
metrics.releaseResources(user, 1, Resources.createResource(2*GB, 2));
|
||||
checkResources(queueSource, 4*GB, 4, 2, 3, 1, 100*GB, 100, 9*GB, 9, 2, 0, 0, 0);
|
||||
|
||||
metrics.incrPendingResources(user, 0, Resources.createResource(2 * GB, 2));
|
||||
checkResources(queueSource, 4 * GB, 4, 2, 3, 1, 100 * GB, 100, 9 * GB, 9, 2,
|
||||
0, 0, 0);
|
||||
|
||||
metrics.decrPendingResources(user, 0, Resources.createResource(2 * GB, 2));
|
||||
checkResources(queueSource, 4 * GB, 4, 2, 3, 1, 100 * GB, 100, 9 * GB, 9, 2,
|
||||
0, 0, 0);
|
||||
|
||||
metrics.finishAppAttempt(
|
||||
app.getApplicationId(), app.isPending(), app.getUser());
|
||||
checkApps(queueSource, 1, 0, 0, 0, 0, 0, true);
|
||||
|
|
Loading…
Reference in New Issue