YARN-6467. CSQueueMetrics needs to update the current metrics for default partition only. Contributed by Manikandan R.

This commit is contained in:
Naganarasimha 2017-06-29 00:09:30 +05:30
parent 44d43a8848
commit a5ae7c0cae
12 changed files with 308 additions and 207 deletions

View File

@ -18,6 +18,19 @@
package org.apache.hadoop.yarn.server.resourcemanager.scheduler; package org.apache.hadoop.yarn.server.resourcemanager.scheduler;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentSkipListSet;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import org.apache.commons.logging.Log; import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory; import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.classification.InterfaceAudience.Private; import org.apache.hadoop.classification.InterfaceAudience.Private;
@ -31,27 +44,12 @@ import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager;
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer; import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerState; import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerState;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.SchedulingMode; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.SchedulingMode;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.PendingAsk;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.placement.LocalitySchedulingPlacementSet; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.placement.LocalitySchedulingPlacementSet;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.placement.ResourceRequestUpdateResult; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.placement.ResourceRequestUpdateResult;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.placement.SchedulingPlacementSet; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.placement.SchedulingPlacementSet;
import org.apache.hadoop.yarn.server.scheduler.SchedulerRequestKey; import org.apache.hadoop.yarn.server.scheduler.SchedulerRequestKey;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.PendingAsk;
import org.apache.hadoop.yarn.util.resource.Resources; import org.apache.hadoop.yarn.util.resource.Resources;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentSkipListSet;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.locks.ReentrantReadWriteLock;
/** /**
* This class keeps track of all the consumption of an application. This also * This class keeps track of all the consumption of an application. This also
* keeps track of current running/completed containers for the application. * keeps track of current running/completed containers for the application.
@ -261,10 +259,13 @@ public class AppSchedulingInfo {
Resource lastRequestCapability = Resource lastRequestCapability =
lastRequest != null ? lastRequest.getCapability() : Resources.none(); lastRequest != null ? lastRequest.getCapability() : Resources.none();
metrics.incrPendingResources(user, metrics.incrPendingResources(request.getNodeLabelExpression(), user,
request.getNumContainers(), request.getCapability()); request.getNumContainers(), request.getCapability());
metrics.decrPendingResources(user,
lastRequestContainers, lastRequestCapability); if(lastRequest != null) {
metrics.decrPendingResources(lastRequest.getNodeLabelExpression(), user,
lastRequestContainers, lastRequestCapability);
}
// update queue: // update queue:
Resource increasedResource = Resource increasedResource =
@ -440,7 +441,7 @@ public class AppSchedulingInfo {
writeLock.lock(); writeLock.lock();
if (null != containerAllocated) { if (null != containerAllocated) {
updateMetricsForAllocatedContainer(type, containerAllocated); updateMetricsForAllocatedContainer(type, node, containerAllocated);
} }
return schedulerKeyToPlacementSets.get(schedulerKey).allocate( return schedulerKeyToPlacementSets.get(schedulerKey).allocate(
@ -464,10 +465,12 @@ public class AppSchedulingInfo {
for (SchedulingPlacementSet ps : schedulerKeyToPlacementSets.values()) { for (SchedulingPlacementSet ps : schedulerKeyToPlacementSets.values()) {
PendingAsk ask = ps.getPendingAsk(ResourceRequest.ANY); PendingAsk ask = ps.getPendingAsk(ResourceRequest.ANY);
if (ask.getCount() > 0) { if (ask.getCount() > 0) {
oldMetrics.decrPendingResources(user, ask.getCount(), oldMetrics.decrPendingResources(
ask.getPerAllocationResource()); ps.getPrimaryRequestedNodePartition(),
newMetrics.incrPendingResources(user, ask.getCount(), user, ask.getCount(), ask.getPerAllocationResource());
ask.getPerAllocationResource()); newMetrics.incrPendingResources(
ps.getPrimaryRequestedNodePartition(),
user, ask.getCount(), ask.getPerAllocationResource());
Resource delta = Resources.multiply(ask.getPerAllocationResource(), Resource delta = Resources.multiply(ask.getPerAllocationResource(),
ask.getCount()); ask.getCount());
@ -497,8 +500,8 @@ public class AppSchedulingInfo {
for (SchedulingPlacementSet ps : schedulerKeyToPlacementSets.values()) { for (SchedulingPlacementSet ps : schedulerKeyToPlacementSets.values()) {
PendingAsk ask = ps.getPendingAsk(ResourceRequest.ANY); PendingAsk ask = ps.getPendingAsk(ResourceRequest.ANY);
if (ask.getCount() > 0) { if (ask.getCount() > 0) {
metrics.decrPendingResources(user, ask.getCount(), metrics.decrPendingResources(ps.getPrimaryRequestedNodePartition(),
ask.getPerAllocationResource()); user, ask.getCount(), ask.getPerAllocationResource());
// Update Queue // Update Queue
queue.decPendingResource( queue.decPendingResource(
@ -558,8 +561,8 @@ public class AppSchedulingInfo {
return; return;
} }
metrics.allocateResources(user, 1, rmContainer.getAllocatedResource(), metrics.allocateResources(rmContainer.getNodeLabelExpression(),
false); user, 1, rmContainer.getAllocatedResource(), false);
} finally { } finally {
this.writeLock.unlock(); this.writeLock.unlock();
} }
@ -583,8 +586,8 @@ public class AppSchedulingInfo {
} }
} }
private void updateMetricsForAllocatedContainer( private void updateMetricsForAllocatedContainer(NodeType type,
NodeType type, Container containerAllocated) { SchedulerNode node, Container containerAllocated) {
QueueMetrics metrics = queue.getMetrics(); QueueMetrics metrics = queue.getMetrics();
if (pending) { if (pending) {
// once an allocation is done we assume the application is // once an allocation is done we assume the application is
@ -600,8 +603,10 @@ public class AppSchedulingInfo {
+ containerAllocated.getResource() + " type=" + containerAllocated.getResource() + " type="
+ type); + type);
} }
metrics.allocateResources(user, 1, containerAllocated.getResource(), if(node != null) {
true); metrics.allocateResources(node.getPartition(), user, 1,
containerAllocated.getResource(), true);
}
metrics.incrNodeTypeAggregations(user, type); metrics.incrNodeTypeAggregations(user, type);
} }

View File

@ -43,6 +43,7 @@ import org.apache.hadoop.metrics2.lib.MutableRate;
import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppState; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppState;
import org.apache.hadoop.yarn.server.utils.BuilderUtils; import org.apache.hadoop.yarn.server.utils.BuilderUtils;
import org.slf4j.Logger; import org.slf4j.Logger;
@ -59,38 +60,45 @@ public class QueueMetrics implements MetricsSource {
@Metric("# of apps completed") MutableCounterInt appsCompleted; @Metric("# of apps completed") MutableCounterInt appsCompleted;
@Metric("# of apps killed") MutableCounterInt appsKilled; @Metric("# of apps killed") MutableCounterInt appsKilled;
@Metric("# of apps failed") MutableCounterInt appsFailed; @Metric("# of apps failed") MutableCounterInt appsFailed;
@Metric("Allocated memory in MB") MutableGaugeLong allocatedMB;
@Metric("Allocated CPU in virtual cores") MutableGaugeInt allocatedVCores;
@Metric("# of allocated containers") MutableGaugeInt allocatedContainers;
@Metric("Aggregate # of allocated containers") MutableCounterLong aggregateContainersAllocated;
@Metric("Aggregate # of allocated node-local containers") @Metric("Aggregate # of allocated node-local containers")
MutableCounterLong aggregateNodeLocalContainersAllocated; MutableCounterLong aggregateNodeLocalContainersAllocated;
@Metric("Aggregate # of allocated rack-local containers") @Metric("Aggregate # of allocated rack-local containers")
MutableCounterLong aggregateRackLocalContainersAllocated; MutableCounterLong aggregateRackLocalContainersAllocated;
@Metric("Aggregate # of allocated off-switch containers") @Metric("Aggregate # of allocated off-switch containers")
MutableCounterLong aggregateOffSwitchContainersAllocated; MutableCounterLong aggregateOffSwitchContainersAllocated;
@Metric("Aggregate # of released containers") MutableCounterLong aggregateContainersReleased;
@Metric("Aggregate # of preempted containers") MutableCounterLong @Metric("Aggregate # of preempted containers") MutableCounterLong
aggregateContainersPreempted; aggregateContainersPreempted;
@Metric("# of active users") MutableGaugeInt activeUsers;
@Metric("# of active applications") MutableGaugeInt activeApplications;
@Metric("App Attempt First Container Allocation Delay")
MutableRate appAttemptFirstContainerAllocationDelay;
//Metrics updated only for "default" partition
@Metric("Allocated memory in MB") MutableGaugeLong allocatedMB;
@Metric("Allocated CPU in virtual cores") MutableGaugeInt allocatedVCores;
@Metric("# of allocated containers") MutableGaugeInt allocatedContainers;
@Metric("Aggregate # of allocated containers")
MutableCounterLong aggregateContainersAllocated;
@Metric("Aggregate # of released containers")
MutableCounterLong aggregateContainersReleased;
@Metric("Available memory in MB") MutableGaugeLong availableMB; @Metric("Available memory in MB") MutableGaugeLong availableMB;
@Metric("Available CPU in virtual cores") MutableGaugeInt availableVCores; @Metric("Available CPU in virtual cores") MutableGaugeInt availableVCores;
@Metric("Pending memory allocation in MB") MutableGaugeLong pendingMB; @Metric("Pending memory allocation in MB") MutableGaugeLong pendingMB;
@Metric("Pending CPU allocation in virtual cores") MutableGaugeInt pendingVCores; @Metric("Pending CPU allocation in virtual cores")
MutableGaugeInt pendingVCores;
@Metric("# of pending containers") MutableGaugeInt pendingContainers; @Metric("# of pending containers") MutableGaugeInt pendingContainers;
@Metric("# of reserved memory in MB") MutableGaugeLong reservedMB; @Metric("# of reserved memory in MB") MutableGaugeLong reservedMB;
@Metric("Reserved CPU in virtual cores") MutableGaugeInt reservedVCores; @Metric("Reserved CPU in virtual cores") MutableGaugeInt reservedVCores;
@Metric("# of reserved containers") MutableGaugeInt reservedContainers; @Metric("# of reserved containers") MutableGaugeInt reservedContainers;
@Metric("# of active users") MutableGaugeInt activeUsers;
@Metric("# of active applications") MutableGaugeInt activeApplications;
@Metric("App Attempt First Container Allocation Delay") MutableRate appAttemptFirstContainerAllocationDelay;
private final MutableGaugeInt[] runningTime; private final MutableGaugeInt[] runningTime;
private TimeBucketMetrics<ApplicationId> runBuckets; private TimeBucketMetrics<ApplicationId> runBuckets;
static final Logger LOG = LoggerFactory.getLogger(QueueMetrics.class); static final Logger LOG = LoggerFactory.getLogger(QueueMetrics.class);
static final MetricsInfo RECORD_INFO = info("QueueMetrics", static final MetricsInfo RECORD_INFO = info("QueueMetrics",
"Metrics for the resource scheduler"); "Metrics for the resource scheduler");
protected static final MetricsInfo QUEUE_INFO = info("Queue", "Metrics by queue"); protected static final MetricsInfo QUEUE_INFO =
info("Queue", "Metrics by queue");
protected static final MetricsInfo USER_INFO = protected static final MetricsInfo USER_INFO =
info("User", "Metrics by user"); info("User", "Metrics by user");
static final Splitter Q_SPLITTER = static final Splitter Q_SPLITTER =
@ -334,41 +342,61 @@ public class QueueMetrics implements MetricsSource {
/** /**
* Set available resources. To be called by scheduler periodically as * Set available resources. To be called by scheduler periodically as
* resources become available. * resources become available.
* @param partition Node Partition
* @param limit resource limit * @param limit resource limit
*/ */
public void setAvailableResourcesToQueue(Resource limit) { public void setAvailableResourcesToQueue(String partition, Resource limit) {
availableMB.set(limit.getMemorySize()); if(partition == null || partition.equals(RMNodeLabelsManager.NO_LABEL)) {
availableVCores.set(limit.getVirtualCores()); availableMB.set(limit.getMemorySize());
availableVCores.set(limit.getVirtualCores());
}
} }
/** /**
* Set available resources. To be called by scheduler periodically as * Set available resources. To be called by scheduler periodically as
* resources become available. * resources become available.
* @param limit resource limit
*/
public void setAvailableResourcesToQueue(Resource limit) {
this.setAvailableResourcesToQueue(RMNodeLabelsManager.NO_LABEL, limit);
}
/**
* Set available resources. To be called by scheduler periodically as
* resources become available.
* @param partition Node Partition
* @param user * @param user
* @param limit resource limit * @param limit resource limit
*/ */
public void setAvailableResourcesToUser(String user, Resource limit) { public void setAvailableResourcesToUser(String partition,
QueueMetrics userMetrics = getUserMetrics(user); String user, Resource limit) {
if (userMetrics != null) { if(partition == null || partition.equals(RMNodeLabelsManager.NO_LABEL)) {
userMetrics.setAvailableResourcesToQueue(limit); QueueMetrics userMetrics = getUserMetrics(user);
if (userMetrics != null) {
userMetrics.setAvailableResourcesToQueue(partition, limit);
}
} }
} }
/** /**
* Increment pending resource metrics * Increment pending resource metrics
* @param partition Node Partition
* @param user * @param user
* @param containers * @param containers
* @param res the TOTAL delta of resources note this is different from * @param res the TOTAL delta of resources note this is different from
* the other APIs which use per container resource * the other APIs which use per container resource
*/ */
public void incrPendingResources(String user, int containers, Resource res) { public void incrPendingResources(String partition, String user,
_incrPendingResources(containers, res); int containers, Resource res) {
QueueMetrics userMetrics = getUserMetrics(user); if(partition == null || partition.equals(RMNodeLabelsManager.NO_LABEL)) {
if (userMetrics != null) { _incrPendingResources(containers, res);
userMetrics.incrPendingResources(user, containers, res); QueueMetrics userMetrics = getUserMetrics(user);
} if (userMetrics != null) {
if (parent != null) { userMetrics.incrPendingResources(partition, user, containers, res);
parent.incrPendingResources(user, containers, res); }
if (parent != null) {
parent.incrPendingResources(partition, user, containers, res);
}
} }
} }
@ -378,14 +406,18 @@ public class QueueMetrics implements MetricsSource {
pendingVCores.incr(res.getVirtualCores() * containers); pendingVCores.incr(res.getVirtualCores() * containers);
} }
public void decrPendingResources(String user, int containers, Resource res) {
_decrPendingResources(containers, res); public void decrPendingResources(String partition, String user,
QueueMetrics userMetrics = getUserMetrics(user); int containers, Resource res) {
if (userMetrics != null) { if(partition == null || partition.equals(RMNodeLabelsManager.NO_LABEL)) {
userMetrics.decrPendingResources(user, containers, res); _decrPendingResources(containers, res);
} QueueMetrics userMetrics = getUserMetrics(user);
if (parent != null) { if (userMetrics != null) {
parent.decrPendingResources(user, containers, res); userMetrics.decrPendingResources(partition, user, containers, res);
}
if (parent != null) {
parent.decrPendingResources(partition, user, containers, res);
}
} }
} }
@ -414,58 +446,66 @@ public class QueueMetrics implements MetricsSource {
} }
} }
public void allocateResources(String user, int containers, Resource res, public void allocateResources(String partition, String user,
boolean decrPending) { int containers, Resource res, boolean decrPending) {
allocatedContainers.incr(containers); if(partition == null || partition.equals(RMNodeLabelsManager.NO_LABEL)) {
aggregateContainersAllocated.incr(containers); allocatedContainers.incr(containers);
aggregateContainersAllocated.incr(containers);
allocatedMB.incr(res.getMemorySize() * containers); allocatedMB.incr(res.getMemorySize() * containers);
allocatedVCores.incr(res.getVirtualCores() * containers); allocatedVCores.incr(res.getVirtualCores() * containers);
if (decrPending) { if (decrPending) {
_decrPendingResources(containers, res); _decrPendingResources(containers, res);
} }
QueueMetrics userMetrics = getUserMetrics(user); QueueMetrics userMetrics = getUserMetrics(user);
if (userMetrics != null) { if (userMetrics != null) {
userMetrics.allocateResources(user, containers, res, decrPending); userMetrics.allocateResources(partition, user,
} containers, res, decrPending);
if (parent != null) { }
parent.allocateResources(user, containers, res, decrPending); if (parent != null) {
parent.allocateResources(partition, user, containers, res, decrPending);
}
} }
} }
/** /**
* Allocate Resource for container size change. * Allocate Resource for container size change.
* * @param partition Node Partition
* @param user * @param user
* @param res * @param res
*/ */
public void allocateResources(String user, Resource res) { public void allocateResources(String partition, String user, Resource res) {
allocatedMB.incr(res.getMemorySize()); if(partition == null || partition.equals(RMNodeLabelsManager.NO_LABEL)) {
allocatedVCores.incr(res.getVirtualCores()); allocatedMB.incr(res.getMemorySize());
allocatedVCores.incr(res.getVirtualCores());
pendingMB.decr(res.getMemorySize()); pendingMB.decr(res.getMemorySize());
pendingVCores.decr(res.getVirtualCores()); pendingVCores.decr(res.getVirtualCores());
QueueMetrics userMetrics = getUserMetrics(user); QueueMetrics userMetrics = getUserMetrics(user);
if (userMetrics != null) { if (userMetrics != null) {
userMetrics.allocateResources(user, res); userMetrics.allocateResources(partition, user, res);
} }
if (parent != null) { if (parent != null) {
parent.allocateResources(user, res); parent.allocateResources(partition, user, res);
}
} }
} }
public void releaseResources(String user, int containers, Resource res) { public void releaseResources(String partition,
allocatedContainers.decr(containers); String user, int containers, Resource res) {
aggregateContainersReleased.incr(containers); if(partition == null || partition.equals(RMNodeLabelsManager.NO_LABEL)) {
allocatedMB.decr(res.getMemorySize() * containers); allocatedContainers.decr(containers);
allocatedVCores.decr(res.getVirtualCores() * containers); aggregateContainersReleased.incr(containers);
QueueMetrics userMetrics = getUserMetrics(user); allocatedMB.decr(res.getMemorySize() * containers);
if (userMetrics != null) { allocatedVCores.decr(res.getVirtualCores() * containers);
userMetrics.releaseResources(user, containers, res); QueueMetrics userMetrics = getUserMetrics(user);
} if (userMetrics != null) {
if (parent != null) { userMetrics.releaseResources(partition, user, containers, res);
parent.releaseResources(user, containers, res); }
if (parent != null) {
parent.releaseResources(partition, user, containers, res);
}
} }
} }
@ -494,6 +534,12 @@ public class QueueMetrics implements MetricsSource {
} }
} }
public void reserveResource(String partition, String user, Resource res) {
if(partition == null || partition.equals(RMNodeLabelsManager.NO_LABEL)) {
reserveResource(user, res);
}
}
public void reserveResource(String user, Resource res) { public void reserveResource(String user, Resource res) {
reservedContainers.incr(); reservedContainers.incr();
reservedMB.incr(res.getMemorySize()); reservedMB.incr(res.getMemorySize());
@ -520,6 +566,12 @@ public class QueueMetrics implements MetricsSource {
} }
} }
public void unreserveResource(String partition, String user, Resource res) {
if(partition == null || partition.equals(RMNodeLabelsManager.NO_LABEL)) {
unreserveResource(user, res);
}
}
public void incrActiveUsers() { public void incrActiveUsers() {
activeUsers.incr(); activeUsers.incr();
} }

View File

@ -1073,15 +1073,19 @@ public class SchedulerApplicationAttempt implements SchedulableEntity {
for (RMContainer liveContainer : liveContainers.values()) { for (RMContainer liveContainer : liveContainers.values()) {
Resource resource = liveContainer.getContainer().getResource(); Resource resource = liveContainer.getContainer().getResource();
((RMContainerImpl) liveContainer).setQueueName(newQueueName); ((RMContainerImpl) liveContainer).setQueueName(newQueueName);
oldMetrics.releaseResources(user, 1, resource); oldMetrics.releaseResources(liveContainer.getNodeLabelExpression(),
newMetrics.allocateResources(user, 1, resource, false); user, 1, resource);
newMetrics.allocateResources(liveContainer.getNodeLabelExpression(),
user, 1, resource, false);
} }
for (Map<NodeId, RMContainer> map : reservedContainers.values()) { for (Map<NodeId, RMContainer> map : reservedContainers.values()) {
for (RMContainer reservedContainer : map.values()) { for (RMContainer reservedContainer : map.values()) {
((RMContainerImpl) reservedContainer).setQueueName(newQueueName); ((RMContainerImpl) reservedContainer).setQueueName(newQueueName);
Resource resource = reservedContainer.getReservedResource(); Resource resource = reservedContainer.getReservedResource();
oldMetrics.unreserveResource(user, resource); oldMetrics.unreserveResource(
newMetrics.reserveResource(user, resource); reservedContainer.getNodeLabelExpression(), user, resource);
newMetrics.reserveResource(
reservedContainer.getNodeLabelExpression(), user, resource);
} }
} }

View File

@ -26,12 +26,14 @@ import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem;
import org.apache.hadoop.metrics2.lib.MutableGaugeFloat; import org.apache.hadoop.metrics2.lib.MutableGaugeFloat;
import org.apache.hadoop.metrics2.lib.MutableGaugeLong; import org.apache.hadoop.metrics2.lib.MutableGaugeLong;
import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.Queue; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.Queue;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.QueueMetrics; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.QueueMetrics;
@Metrics(context = "yarn") @Metrics(context = "yarn")
public class CSQueueMetrics extends QueueMetrics { public class CSQueueMetrics extends QueueMetrics {
//Metrics updated only for "default" partition
@Metric("AM memory limit in MB") @Metric("AM memory limit in MB")
MutableGaugeLong AMResourceLimitMB; MutableGaugeLong AMResourceLimitMB;
@Metric("AM CPU limit in virtual cores") @Metric("AM CPU limit in virtual cores")
@ -66,33 +68,40 @@ public class CSQueueMetrics extends QueueMetrics {
return usedAMResourceVCores.value(); return usedAMResourceVCores.value();
} }
public void setAMResouceLimit(Resource res) { public void setAMResouceLimit(String partition, Resource res) {
AMResourceLimitMB.set(res.getMemorySize()); if(partition == null || partition.equals(RMNodeLabelsManager.NO_LABEL)) {
AMResourceLimitVCores.set(res.getVirtualCores()); AMResourceLimitMB.set(res.getMemorySize());
} AMResourceLimitVCores.set(res.getVirtualCores());
public void setAMResouceLimitForUser(String user, Resource res) {
CSQueueMetrics userMetrics = (CSQueueMetrics) getUserMetrics(user);
if (userMetrics != null) {
userMetrics.setAMResouceLimit(res);
} }
} }
public void incAMUsed(String user, Resource res) { public void setAMResouceLimitForUser(String partition,
usedAMResourceMB.incr(res.getMemorySize()); String user, Resource res) {
usedAMResourceVCores.incr(res.getVirtualCores());
CSQueueMetrics userMetrics = (CSQueueMetrics) getUserMetrics(user); CSQueueMetrics userMetrics = (CSQueueMetrics) getUserMetrics(user);
if (userMetrics != null) { if (userMetrics != null) {
userMetrics.incAMUsed(user, res); userMetrics.setAMResouceLimit(partition, res);
} }
} }
public void decAMUsed(String user, Resource res) { public void incAMUsed(String partition, String user, Resource res) {
usedAMResourceMB.decr(res.getMemorySize()); if(partition == null || partition.equals(RMNodeLabelsManager.NO_LABEL)) {
usedAMResourceVCores.decr(res.getVirtualCores()); usedAMResourceMB.incr(res.getMemorySize());
CSQueueMetrics userMetrics = (CSQueueMetrics) getUserMetrics(user); usedAMResourceVCores.incr(res.getVirtualCores());
if (userMetrics != null) { CSQueueMetrics userMetrics = (CSQueueMetrics) getUserMetrics(user);
userMetrics.decAMUsed(user, res); if (userMetrics != null) {
userMetrics.incAMUsed(partition, user, res);
}
}
}
public void decAMUsed(String partition, String user, Resource res) {
if(partition == null || partition.equals(RMNodeLabelsManager.NO_LABEL)) {
usedAMResourceMB.decr(res.getMemorySize());
usedAMResourceVCores.decr(res.getVirtualCores());
CSQueueMetrics userMetrics = (CSQueueMetrics) getUserMetrics(user);
if (userMetrics != null) {
userMetrics.decAMUsed(partition, user, res);
}
} }
} }
@ -100,16 +109,21 @@ public class CSQueueMetrics extends QueueMetrics {
return usedCapacity.value(); return usedCapacity.value();
} }
public void setUsedCapacity(float usedCapacity) { public void setUsedCapacity(String partition, float usedCap) {
this.usedCapacity.set(usedCapacity); if(partition == null || partition.equals(RMNodeLabelsManager.NO_LABEL)) {
this.usedCapacity.set(usedCap);
}
} }
public float getAbsoluteUsedCapacity() { public float getAbsoluteUsedCapacity() {
return absoluteUsedCapacity.value(); return absoluteUsedCapacity.value();
} }
public void setAbsoluteUsedCapacity(Float absoluteUsedCapacity) { public void setAbsoluteUsedCapacity(String partition,
this.absoluteUsedCapacity.set(absoluteUsedCapacity); Float absoluteUsedCap) {
if(partition == null || partition.equals(RMNodeLabelsManager.NO_LABEL)) {
this.absoluteUsedCapacity.set(absoluteUsedCap);
}
} }
public synchronized static CSQueueMetrics forQueue(String queueName, public synchronized static CSQueueMetrics forQueue(String queueName,

View File

@ -230,13 +230,13 @@ class CSQueueUtils {
// QueueMetrics does not support per-label capacities, // QueueMetrics does not support per-label capacities,
// so we report values only for the default partition. // so we report values only for the default partition.
if (nodePartition.equals(CommonNodeLabelsManager.NO_LABEL)) {
queueMetrics.setUsedCapacity( queueMetrics.setUsedCapacity(nodePartition,
queueCapacities.getUsedCapacity(RMNodeLabelsManager.NO_LABEL)); queueCapacities.getUsedCapacity(RMNodeLabelsManager.NO_LABEL));
queueMetrics.setAbsoluteUsedCapacity( queueMetrics.setAbsoluteUsedCapacity(nodePartition,
queueCapacities.getAbsoluteUsedCapacity( queueCapacities.getAbsoluteUsedCapacity(
RMNodeLabelsManager.NO_LABEL)); RMNodeLabelsManager.NO_LABEL));
}
} }
private static Resource getMaxAvailableResourceToQueue( private static Resource getMaxAvailableResourceToQueue(
@ -302,7 +302,7 @@ class CSQueueUtils {
// Update queue metrics w.r.t node labels. In a generic way, we can // Update queue metrics w.r.t node labels. In a generic way, we can
// calculate available resource from all labels in cluster. // calculate available resource from all labels in cluster.
childQueue.getMetrics().setAvailableResourcesToQueue( childQueue.getMetrics().setAvailableResourcesToQueue(nodePartition,
getMaxAvailableResourceToQueue(rc, nlm, childQueue, cluster)); getMaxAvailableResourceToQueue(rc, nlm, childQueue, cluster));
} }
} }

View File

@ -744,7 +744,7 @@ public class LeafQueue extends AbstractCSQueue {
resourceCalculator, queuePartitionUsableResource, amResourcePercent, resourceCalculator, queuePartitionUsableResource, amResourcePercent,
minimumAllocation); minimumAllocation);
metrics.setAMResouceLimit(amResouceLimit); metrics.setAMResouceLimit(nodePartition, amResouceLimit);
queueUsage.setAMLimit(nodePartition, amResouceLimit); queueUsage.setAMLimit(nodePartition, amResouceLimit);
return amResouceLimit; return amResouceLimit;
} finally { } finally {
@ -859,9 +859,10 @@ public class LeafQueue extends AbstractCSQueue {
user.getResourceUsage().incAMUsed(partitionName, user.getResourceUsage().incAMUsed(partitionName,
application.getAMResource(partitionName)); application.getAMResource(partitionName));
user.getResourceUsage().setAMLimit(partitionName, userAMLimit); user.getResourceUsage().setAMLimit(partitionName, userAMLimit);
metrics.incAMUsed(application.getUser(), metrics.incAMUsed(partitionName, application.getUser(),
application.getAMResource(partitionName)); application.getAMResource(partitionName));
metrics.setAMResouceLimitForUser(application.getUser(), userAMLimit); metrics.setAMResouceLimitForUser(partitionName,
application.getUser(), userAMLimit);
fsApp.remove(); fsApp.remove();
LOG.info("Application " + applicationId + " from user: " + application LOG.info("Application " + applicationId + " from user: " + application
.getUser() + " activated in queue: " + getQueueName()); .getUser() + " activated in queue: " + getQueueName());
@ -942,7 +943,7 @@ public class LeafQueue extends AbstractCSQueue {
application.getAMResource(partitionName)); application.getAMResource(partitionName));
user.getResourceUsage().decAMUsed(partitionName, user.getResourceUsage().decAMUsed(partitionName,
application.getAMResource(partitionName)); application.getAMResource(partitionName));
metrics.decAMUsed(application.getUser(), metrics.decAMUsed(partitionName, application.getUser(),
application.getAMResource(partitionName)); application.getAMResource(partitionName));
} }
applicationAttemptMap.remove(application.getApplicationAttemptId()); applicationAttemptMap.remove(application.getApplicationAttemptId());
@ -1386,7 +1387,7 @@ public class LeafQueue extends AbstractCSQueue {
application.setHeadroomProvider(headroomProvider); application.setHeadroomProvider(headroomProvider);
metrics.setAvailableResourcesToUser(user, headroom); metrics.setAvailableResourcesToUser(nodePartition, user, headroom);
return userLimit; return userLimit;
} }
@ -1741,7 +1742,8 @@ public class LeafQueue extends AbstractCSQueue {
// Note this is a bit unconventional since it gets the object and modifies // Note this is a bit unconventional since it gets the object and modifies
// it here, rather then using set routine // it here, rather then using set routine
Resources.subtractFrom(application.getHeadroom(), resource); // headroom Resources.subtractFrom(application.getHeadroom(), resource); // headroom
metrics.setAvailableResourcesToUser(userName, application.getHeadroom()); metrics.setAvailableResourcesToUser(nodePartition,
userName, application.getHeadroom());
if (LOG.isDebugEnabled()) { if (LOG.isDebugEnabled()) {
LOG.debug(getQueueName() + " user=" + userName + " used=" + queueUsage LOG.debug(getQueueName() + " user=" + userName + " used=" + queueUsage
@ -1786,7 +1788,8 @@ public class LeafQueue extends AbstractCSQueue {
user.updateUsageRatio(resourceCalculator, resourceByLabel, user.updateUsageRatio(resourceCalculator, resourceByLabel,
nodePartition)); nodePartition));
metrics.setAvailableResourcesToUser(userName, application.getHeadroom()); metrics.setAvailableResourcesToUser(nodePartition,
userName, application.getHeadroom());
if (LOG.isDebugEnabled()) { if (LOG.isDebugEnabled()) {
LOG.debug( LOG.debug(

View File

@ -18,7 +18,15 @@
package org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica; package org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica;
import com.google.common.annotations.VisibleForTesting; import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import org.apache.commons.logging.Log; import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory; import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.classification.InterfaceAudience.Private; import org.apache.hadoop.classification.InterfaceAudience.Private;
@ -65,28 +73,18 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.Scheduli
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.allocator.AbstractContainerAllocator; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.allocator.AbstractContainerAllocator;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.allocator.ContainerAllocator; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.allocator.ContainerAllocator;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.ContainerAllocationProposal; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.ContainerAllocationProposal;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.PendingAsk;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.ResourceCommitRequest; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.ResourceCommitRequest;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.SchedulerContainer; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.SchedulerContainer;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.placement.PlacementSet; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.placement.PlacementSet;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.placement.SchedulingPlacementSet; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.placement.SchedulingPlacementSet;
import org.apache.hadoop.yarn.server.scheduler.SchedulerRequestKey; import org.apache.hadoop.yarn.server.scheduler.SchedulerRequestKey;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.PendingAsk;
import org.apache.hadoop.yarn.server.utils.BuilderUtils; import org.apache.hadoop.yarn.server.utils.BuilderUtils;
import org.apache.hadoop.yarn.util.resource.DefaultResourceCalculator; import org.apache.hadoop.yarn.util.resource.DefaultResourceCalculator;
import org.apache.hadoop.yarn.util.resource.ResourceCalculator; import org.apache.hadoop.yarn.util.resource.ResourceCalculator;
import org.apache.hadoop.yarn.util.resource.Resources; import org.apache.hadoop.yarn.util.resource.Resources;
import java.util.Collections; import com.google.common.annotations.VisibleForTesting;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.locks.ReentrantReadWriteLock;
/** /**
* Represents an application attempt from the viewpoint of the FIFO or Capacity * Represents an application attempt from the viewpoint of the FIFO or Capacity
@ -198,7 +196,8 @@ public class FiCaSchedulerApp extends SchedulerApplicationAttempt {
"SchedulerApp", getApplicationId(), containerId, containerResource); "SchedulerApp", getApplicationId(), containerId, containerResource);
// Update usage metrics // Update usage metrics
queue.getMetrics().releaseResources(getUser(), 1, containerResource); queue.getMetrics().releaseResources(partition,
getUser(), 1, containerResource);
attemptResourceUsage.decUsed(partition, containerResource); attemptResourceUsage.decUsed(partition, containerResource);
// Clear resource utilization metrics cache. // Clear resource utilization metrics cache.
@ -572,8 +571,8 @@ public class FiCaSchedulerApp extends SchedulerApplicationAttempt {
node.unreserveResource(this); node.unreserveResource(this);
// Update reserved metrics // Update reserved metrics
queue.getMetrics().unreserveResource(getUser(), queue.getMetrics().unreserveResource(node.getPartition(),
rmContainer.getReservedResource()); getUser(), rmContainer.getReservedResource());
queue.decReservedResource(node.getPartition(), queue.decReservedResource(node.getPartition(),
rmContainer.getReservedResource()); rmContainer.getReservedResource());
return true; return true;
@ -782,7 +781,7 @@ public class FiCaSchedulerApp extends SchedulerApplicationAttempt {
// Update reserved metrics if this is the first reservation // Update reserved metrics if this is the first reservation
// rmContainer will be moved to reserved in the super.reserve // rmContainer will be moved to reserved in the super.reserve
if (!reReservation) { if (!reReservation) {
queue.getMetrics().reserveResource( queue.getMetrics().reserveResource(node.getPartition(),
getUser(), container.getResource()); getUser(), container.getResource());
} }

View File

@ -18,6 +18,16 @@
package org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair; package org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair;
import java.text.DecimalFormat;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import org.apache.commons.logging.Log; import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory; import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.classification.InterfaceAudience.Private; import org.apache.hadoop.classification.InterfaceAudience.Private;
@ -51,16 +61,6 @@ import org.apache.hadoop.yarn.server.utils.BuilderUtils;
import org.apache.hadoop.yarn.util.resource.DefaultResourceCalculator; import org.apache.hadoop.yarn.util.resource.DefaultResourceCalculator;
import org.apache.hadoop.yarn.util.resource.Resources; import org.apache.hadoop.yarn.util.resource.Resources;
import java.text.DecimalFormat;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
/** /**
* Represents an application attempt from the viewpoint of the Fair Scheduler. * Represents an application attempt from the viewpoint of the Fair Scheduler.
*/ */
@ -169,7 +169,9 @@ public class FSAppAttempt extends SchedulerApplicationAttempt
"SchedulerApp", getApplicationId(), containerId, containerResource); "SchedulerApp", getApplicationId(), containerId, containerResource);
// Update usage metrics // Update usage metrics
queue.getMetrics().releaseResources(getUser(), 1, containerResource); queue.getMetrics().releaseResources(
rmContainer.getNodeLabelExpression(),
getUser(), 1, containerResource);
this.attemptResourceUsage.decUsed(containerResource); this.attemptResourceUsage.decUsed(containerResource);
// Clear resource utilization metrics cache. // Clear resource utilization metrics cache.
@ -653,7 +655,7 @@ public class FSAppAttempt extends SchedulerApplicationAttempt
reservedContainer = reservedContainer =
createContainer(node, perAllocationResource, createContainer(node, perAllocationResource,
schedulerKey); schedulerKey);
getMetrics().reserveResource(getUser(), getMetrics().reserveResource(node.getPartition(), getUser(),
reservedContainer.getResource()); reservedContainer.getResource());
RMContainer rmContainer = RMContainer rmContainer =
super.reserve(node, schedulerKey, null, reservedContainer); super.reserve(node, schedulerKey, null, reservedContainer);
@ -712,7 +714,7 @@ public class FSAppAttempt extends SchedulerApplicationAttempt
unreserveInternal(schedulerKey, node); unreserveInternal(schedulerKey, node);
node.unreserveResource(this); node.unreserveResource(this);
clearReservation(node); clearReservation(node);
getMetrics().unreserveResource( getMetrics().unreserveResource(node.getPartition(),
getUser(), rmContainer.getContainer().getResource()); getUser(), rmContainer.getContainer().getResource());
} }

View File

@ -37,6 +37,7 @@ import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.server.resourcemanager.MockRM; import org.apache.hadoop.yarn.server.resourcemanager.MockRM;
import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppState; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppState;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fifo.FifoScheduler; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fifo.FifoScheduler;
import org.apache.hadoop.yarn.server.utils.BuilderUtils; import org.apache.hadoop.yarn.server.utils.BuilderUtils;
@ -72,8 +73,10 @@ public class TestQueueMetrics {
metrics.submitAppAttempt(user); metrics.submitAppAttempt(user);
checkApps(queueSource, 1, 1, 0, 0, 0, 0, true); checkApps(queueSource, 1, 1, 0, 0, 0, 0, true);
metrics.setAvailableResourcesToQueue(Resources.createResource(100*GB, 100)); metrics.setAvailableResourcesToQueue(RMNodeLabelsManager.NO_LABEL,
metrics.incrPendingResources(user, 5, Resources.createResource(3*GB, 3)); Resources.createResource(100*GB, 100));
metrics.incrPendingResources(RMNodeLabelsManager.NO_LABEL,
user, 5, Resources.createResource(3*GB, 3));
// Available resources is set externally, as it depends on dynamic // Available resources is set externally, as it depends on dynamic
// configurable cluster/queue resources // configurable cluster/queue resources
checkResources(queueSource, 0, 0, 0, 0, 0, 100*GB, 100, 15*GB, 15, 5, 0, 0, 0); checkResources(queueSource, 0, 0, 0, 0, 0, 100*GB, 100, 15*GB, 15, 5, 0, 0, 0);
@ -81,17 +84,21 @@ public class TestQueueMetrics {
metrics.runAppAttempt(app.getApplicationId(), user); metrics.runAppAttempt(app.getApplicationId(), user);
checkApps(queueSource, 1, 0, 1, 0, 0, 0, true); checkApps(queueSource, 1, 0, 1, 0, 0, 0, true);
metrics.allocateResources(user, 3, Resources.createResource(2*GB, 2), true); metrics.allocateResources(RMNodeLabelsManager.NO_LABEL,
user, 3, Resources.createResource(2*GB, 2), true);
checkResources(queueSource, 6*GB, 6, 3, 3, 0, 100*GB, 100, 9*GB, 9, 2, 0, 0, 0); checkResources(queueSource, 6*GB, 6, 3, 3, 0, 100*GB, 100, 9*GB, 9, 2, 0, 0, 0);
metrics.releaseResources(user, 1, Resources.createResource(2*GB, 2)); metrics.releaseResources(RMNodeLabelsManager.NO_LABEL,
user, 1, Resources.createResource(2*GB, 2));
checkResources(queueSource, 4*GB, 4, 2, 3, 1, 100*GB, 100, 9*GB, 9, 2, 0, 0, 0); checkResources(queueSource, 4*GB, 4, 2, 3, 1, 100*GB, 100, 9*GB, 9, 2, 0, 0, 0);
metrics.incrPendingResources(user, 0, Resources.createResource(2 * GB, 2)); metrics.incrPendingResources(RMNodeLabelsManager.NO_LABEL,
user, 0, Resources.createResource(2 * GB, 2));
checkResources(queueSource, 4 * GB, 4, 2, 3, 1, 100 * GB, 100, 9 * GB, 9, 2, checkResources(queueSource, 4 * GB, 4, 2, 3, 1, 100 * GB, 100, 9 * GB, 9, 2,
0, 0, 0); 0, 0, 0);
metrics.decrPendingResources(user, 0, Resources.createResource(2 * GB, 2)); metrics.decrPendingResources(RMNodeLabelsManager.NO_LABEL,
user, 0, Resources.createResource(2 * GB, 2));
checkResources(queueSource, 4 * GB, 4, 2, 3, 1, 100 * GB, 100, 9 * GB, 9, 2, checkResources(queueSource, 4 * GB, 4, 2, 3, 1, 100 * GB, 100, 9 * GB, 9, 2,
0, 0, 0); 0, 0, 0);
@ -177,9 +184,12 @@ public class TestQueueMetrics {
checkApps(queueSource, 1, 1, 0, 0, 0, 0, true); checkApps(queueSource, 1, 1, 0, 0, 0, 0, true);
checkApps(userSource, 1, 1, 0, 0, 0, 0, true); checkApps(userSource, 1, 1, 0, 0, 0, 0, true);
metrics.setAvailableResourcesToQueue(Resources.createResource(100*GB, 100)); metrics.setAvailableResourcesToQueue(RMNodeLabelsManager.NO_LABEL,
metrics.setAvailableResourcesToUser(user, Resources.createResource(10*GB, 10)); Resources.createResource(100*GB, 100));
metrics.incrPendingResources(user, 5, Resources.createResource(3*GB, 3)); metrics.setAvailableResourcesToUser(RMNodeLabelsManager.NO_LABEL,
user, Resources.createResource(10*GB, 10));
metrics.incrPendingResources(RMNodeLabelsManager.NO_LABEL,
user, 5, Resources.createResource(3*GB, 3));
// Available resources is set externally, as it depends on dynamic // Available resources is set externally, as it depends on dynamic
// configurable cluster/queue resources // configurable cluster/queue resources
checkResources(queueSource, 0, 0, 0, 0, 0, 100*GB, 100, 15*GB, 15, 5, 0, 0, 0); checkResources(queueSource, 0, 0, 0, 0, 0, 100*GB, 100, 15*GB, 15, 5, 0, 0, 0);
@ -189,11 +199,13 @@ public class TestQueueMetrics {
checkApps(queueSource, 1, 0, 1, 0, 0, 0, true); checkApps(queueSource, 1, 0, 1, 0, 0, 0, true);
checkApps(userSource, 1, 0, 1, 0, 0, 0, true); checkApps(userSource, 1, 0, 1, 0, 0, 0, true);
metrics.allocateResources(user, 3, Resources.createResource(2*GB, 2), true); metrics.allocateResources(RMNodeLabelsManager.NO_LABEL,
user, 3, Resources.createResource(2*GB, 2), true);
checkResources(queueSource, 6*GB, 6, 3, 3, 0, 100*GB, 100, 9*GB, 9, 2, 0, 0, 0); checkResources(queueSource, 6*GB, 6, 3, 3, 0, 100*GB, 100, 9*GB, 9, 2, 0, 0, 0);
checkResources(userSource, 6*GB, 6, 3, 3, 0, 10*GB, 10, 9*GB, 9, 2, 0, 0, 0); checkResources(userSource, 6*GB, 6, 3, 3, 0, 10*GB, 10, 9*GB, 9, 2, 0, 0, 0);
metrics.releaseResources(user, 1, Resources.createResource(2*GB, 2)); metrics.releaseResources(RMNodeLabelsManager.NO_LABEL,
user, 1, Resources.createResource(2*GB, 2));
checkResources(queueSource, 4*GB, 4, 2, 3, 1, 100*GB, 100, 9*GB, 9, 2, 0, 0, 0); checkResources(queueSource, 4*GB, 4, 2, 3, 1, 100*GB, 100, 9*GB, 9, 2, 0, 0, 0);
checkResources(userSource, 4*GB, 4, 2, 3, 1, 10*GB, 10, 9*GB, 9, 2, 0, 0, 0); checkResources(userSource, 4*GB, 4, 2, 3, 1, 10*GB, 10, 9*GB, 9, 2, 0, 0, 0);
@ -283,11 +295,16 @@ public class TestQueueMetrics {
checkApps(userSource, 1, 1, 0, 0, 0, 0, true); checkApps(userSource, 1, 1, 0, 0, 0, 0, true);
checkApps(parentUserSource, 1, 1, 0, 0, 0, 0, true); checkApps(parentUserSource, 1, 1, 0, 0, 0, 0, true);
parentMetrics.setAvailableResourcesToQueue(Resources.createResource(100*GB, 100)); parentMetrics.setAvailableResourcesToQueue(RMNodeLabelsManager.NO_LABEL,
metrics.setAvailableResourcesToQueue(Resources.createResource(100*GB, 100)); Resources.createResource(100*GB, 100));
parentMetrics.setAvailableResourcesToUser(user, Resources.createResource(10*GB, 10)); metrics.setAvailableResourcesToQueue(RMNodeLabelsManager.NO_LABEL,
metrics.setAvailableResourcesToUser(user, Resources.createResource(10*GB, 10)); Resources.createResource(100*GB, 100));
metrics.incrPendingResources(user, 5, Resources.createResource(3*GB, 3)); parentMetrics.setAvailableResourcesToUser(RMNodeLabelsManager.NO_LABEL,
user, Resources.createResource(10*GB, 10));
metrics.setAvailableResourcesToUser(RMNodeLabelsManager.NO_LABEL,
user, Resources.createResource(10*GB, 10));
metrics.incrPendingResources(RMNodeLabelsManager.NO_LABEL,
user, 5, Resources.createResource(3*GB, 3));
checkResources(queueSource, 0, 0, 0, 0, 0, 100*GB, 100, 15*GB, 15, 5, 0, 0, 0); checkResources(queueSource, 0, 0, 0, 0, 0, 100*GB, 100, 15*GB, 15, 5, 0, 0, 0);
checkResources(parentQueueSource, 0, 0, 0, 0, 0, 100*GB, 100, 15*GB, 15, 5, 0, 0, 0); checkResources(parentQueueSource, 0, 0, 0, 0, 0, 100*GB, 100, 15*GB, 15, 5, 0, 0, 0);
checkResources(userSource, 0, 0, 0, 0, 0, 10*GB, 10, 15*GB, 15, 5, 0, 0, 0); checkResources(userSource, 0, 0, 0, 0, 0, 10*GB, 10, 15*GB, 15, 5, 0, 0, 0);
@ -297,8 +314,10 @@ public class TestQueueMetrics {
checkApps(queueSource, 1, 0, 1, 0, 0, 0, true); checkApps(queueSource, 1, 0, 1, 0, 0, 0, true);
checkApps(userSource, 1, 0, 1, 0, 0, 0, true); checkApps(userSource, 1, 0, 1, 0, 0, 0, true);
metrics.allocateResources(user, 3, Resources.createResource(2*GB, 2), true); metrics.allocateResources(RMNodeLabelsManager.NO_LABEL,
metrics.reserveResource(user, Resources.createResource(3*GB, 3)); user, 3, Resources.createResource(2*GB, 2), true);
metrics.reserveResource(RMNodeLabelsManager.NO_LABEL,
user, Resources.createResource(3*GB, 3));
// Available resources is set externally, as it depends on dynamic // Available resources is set externally, as it depends on dynamic
// configurable cluster/queue resources // configurable cluster/queue resources
checkResources(queueSource, 6*GB, 6, 3, 3, 0, 100*GB, 100, 9*GB, 9, 2, 3*GB, 3, 1); checkResources(queueSource, 6*GB, 6, 3, 3, 0, 100*GB, 100, 9*GB, 9, 2, 3*GB, 3, 1);
@ -306,8 +325,10 @@ public class TestQueueMetrics {
checkResources(userSource, 6*GB, 6, 3, 3, 0, 10*GB, 10, 9*GB, 9, 2, 3*GB, 3, 1); checkResources(userSource, 6*GB, 6, 3, 3, 0, 10*GB, 10, 9*GB, 9, 2, 3*GB, 3, 1);
checkResources(parentUserSource, 6*GB, 6, 3, 3, 0, 10*GB, 10, 9*GB, 9, 2, 3*GB, 3, 1); checkResources(parentUserSource, 6*GB, 6, 3, 3, 0, 10*GB, 10, 9*GB, 9, 2, 3*GB, 3, 1);
metrics.releaseResources(user, 1, Resources.createResource(2*GB, 2)); metrics.releaseResources(RMNodeLabelsManager.NO_LABEL,
metrics.unreserveResource(user, Resources.createResource(3*GB, 3)); user, 1, Resources.createResource(2*GB, 2));
metrics.unreserveResource(RMNodeLabelsManager.NO_LABEL,
user, Resources.createResource(3*GB, 3));
checkResources(queueSource, 4*GB, 4, 2, 3, 1, 100*GB, 100, 9*GB, 9, 2, 0, 0, 0); checkResources(queueSource, 4*GB, 4, 2, 3, 1, 100*GB, 100, 9*GB, 9, 2, 0, 0, 0);
checkResources(parentQueueSource, 4*GB, 4, 2, 3, 1, 100*GB, 100, 9*GB, 9, 2, 0, 0, 0); checkResources(parentQueueSource, 4*GB, 4, 2, 3, 1, 100*GB, 100, 9*GB, 9, 2, 0, 0, 0);
checkResources(userSource, 4*GB, 4, 2, 3, 1, 10*GB, 10, 9*GB, 9, 2, 0, 0, 0); checkResources(userSource, 4*GB, 4, 2, 3, 1, 10*GB, 10, 9*GB, 9, 2, 0, 0, 0);

View File

@ -17,6 +17,7 @@
*/ */
package org.apache.hadoop.yarn.server.resourcemanager.scheduler; package org.apache.hadoop.yarn.server.resourcemanager.scheduler;
import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.TestUtils.toSchedulerKey;
import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertEquals;
import static org.mockito.Mockito.mock; import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when; import static org.mockito.Mockito.when;
@ -41,14 +42,11 @@ import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer; import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerImpl; import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerImpl;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fifo.FifoScheduler; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fifo.FifoScheduler;
import org.apache.hadoop.yarn.server.scheduler.SchedulerRequestKey; import org.apache.hadoop.yarn.server.scheduler.SchedulerRequestKey;
import org.apache.hadoop.yarn.util.resource.DefaultResourceCalculator; import org.apache.hadoop.yarn.util.resource.DefaultResourceCalculator;
import org.junit.After; import org.junit.After;
import org.junit.Test; import org.junit.Test;
import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.TestUtils.toSchedulerKey;
public class TestSchedulerApplicationAttempt { public class TestSchedulerApplicationAttempt {
private static final NodeId nodeId = NodeId.newInstance("somehost", 5); private static final NodeId nodeId = NodeId.newInstance("somehost", 5);
@ -103,7 +101,8 @@ public class TestSchedulerApplicationAttempt {
Map<NodeId, RMContainer> reservations = new HashMap<NodeId, RMContainer>(); Map<NodeId, RMContainer> reservations = new HashMap<NodeId, RMContainer>();
reservations.put(node.getNodeID(), container2); reservations.put(node.getNodeID(), container2);
app.reservedContainers.put(toSchedulerKey(prio1), reservations); app.reservedContainers.put(toSchedulerKey(prio1), reservations);
oldMetrics.reserveResource(user, reservedResource); oldMetrics.reserveResource(container2.getNodeLabelExpression(),
user, reservedResource);
checkQueueMetrics(oldMetrics, 1, 1, 1536, 2, 2048, 3, 3072, 4); checkQueueMetrics(oldMetrics, 1, 1, 1536, 2, 2048, 3, 3072, 4);
checkQueueMetrics(newMetrics, 0, 0, 0, 0, 0, 0, 0, 0); checkQueueMetrics(newMetrics, 0, 0, 0, 0, 0, 0, 0, 0);

View File

@ -2381,8 +2381,10 @@ public class TestCapacityScheduler {
sch.getApplicationAttempt(appAttemptId).getLiveContainersMap() sch.getApplicationAttempt(appAttemptId).getLiveContainersMap()
.put(newContainerId, rmContainer); .put(newContainerId, rmContainer);
QueueMetrics queueA1M = queueA1.getMetrics(); QueueMetrics queueA1M = queueA1.getMetrics();
queueA1M.incrPendingResources("user1", 1, resource); queueA1M.incrPendingResources(rmContainer.getNodeLabelExpression(),
queueA1M.allocateResources("user1", resource); "user1", 1, resource);
queueA1M.allocateResources(rmContainer.getNodeLabelExpression(),
"user1", resource);
// remove attempt // remove attempt
sch.handle(new AppAttemptRemovedSchedulerEvent(appAttemptId, sch.handle(new AppAttemptRemovedSchedulerEvent(appAttemptId,
RMAppAttemptState.KILLED, true)); RMAppAttemptState.KILLED, true));

View File

@ -1955,8 +1955,8 @@ public class TestNodeLabelContainerAllocation {
reportNm2.getAvailableResource().getMemorySize()); reportNm2.getAvailableResource().getMemorySize());
LeafQueue leafQueue = (LeafQueue) cs.getQueue("a"); LeafQueue leafQueue = (LeafQueue) cs.getQueue("a");
assertEquals(0 * GB, leafQueue.getMetrics().getAvailableMB()); assertEquals(5 * GB, leafQueue.getMetrics().getAvailableMB());
assertEquals(5 * GB, leafQueue.getMetrics().getAllocatedMB()); assertEquals(0 * GB, leafQueue.getMetrics().getAllocatedMB());
// Kill all apps in queue a // Kill all apps in queue a
cs.killAllAppsInQueue("a"); cs.killAllAppsInQueue("a");
@ -2058,8 +2058,8 @@ public class TestNodeLabelContainerAllocation {
double delta = 0.0001; double delta = 0.0001;
// 3GB is used from label x quota. 1.5 GB is remaining from default label. // 3GB is used from label x quota. 1.5 GB is remaining from default label.
// 2GB is remaining from label x. // 2GB is remaining from label x.
assertEquals(3.5 * GB, leafQueue.getMetrics().getAvailableMB(), delta); assertEquals(6.5 * GB, leafQueue.getMetrics().getAvailableMB(), delta);
assertEquals(4 * GB, leafQueue.getMetrics().getAllocatedMB()); assertEquals(1 * GB, leafQueue.getMetrics().getAllocatedMB());
// app1 asks for 1 default partition container // app1 asks for 1 default partition container
am1.allocate("*", 1 * GB, 5, new ArrayList<ContainerId>()); am1.allocate("*", 1 * GB, 5, new ArrayList<ContainerId>());
@ -2076,7 +2076,7 @@ public class TestNodeLabelContainerAllocation {
// 3GB is used from label x quota. 2GB used from default label. // 3GB is used from label x quota. 2GB used from default label.
// So total 2.5 GB is remaining. // So total 2.5 GB is remaining.
assertEquals(2.5 * GB, leafQueue.getMetrics().getAvailableMB(), delta); assertEquals(2.5 * GB, leafQueue.getMetrics().getAvailableMB(), delta);
assertEquals(5 * GB, leafQueue.getMetrics().getAllocatedMB()); assertEquals(2 * GB, leafQueue.getMetrics().getAllocatedMB());
rm1.close(); rm1.close();
} }