YARN-7728. Expose container preemptions related information in Capacity Scheduler queue metrics. Contributed by Eric Payne.
(cherry picked from commit 82cc6f6968
)
This commit is contained in:
parent
9ce24d2d50
commit
bed683d300
|
@ -68,6 +68,10 @@ public class QueueMetrics implements MetricsSource {
|
||||||
MutableCounterLong aggregateOffSwitchContainersAllocated;
|
MutableCounterLong aggregateOffSwitchContainersAllocated;
|
||||||
@Metric("Aggregate # of preempted containers") MutableCounterLong
|
@Metric("Aggregate # of preempted containers") MutableCounterLong
|
||||||
aggregateContainersPreempted;
|
aggregateContainersPreempted;
|
||||||
|
@Metric("Aggregate # of preempted memory seconds") MutableCounterLong
|
||||||
|
aggregateMemoryMBSecondsPreempted;
|
||||||
|
@Metric("Aggregate # of preempted vcore seconds") MutableCounterLong
|
||||||
|
aggregateVcoreSecondsPreempted;
|
||||||
@Metric("# of active users") MutableGaugeInt activeUsers;
|
@Metric("# of active users") MutableGaugeInt activeUsers;
|
||||||
@Metric("# of active applications") MutableGaugeInt activeApplications;
|
@Metric("# of active applications") MutableGaugeInt activeApplications;
|
||||||
@Metric("App Attempt First Container Allocation Delay")
|
@Metric("App Attempt First Container Allocation Delay")
|
||||||
|
@ -534,6 +538,20 @@ public class QueueMetrics implements MetricsSource {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public void updatePreemptedMemoryMBSeconds(long mbSeconds) {
|
||||||
|
aggregateMemoryMBSecondsPreempted.incr(mbSeconds);
|
||||||
|
if (parent != null) {
|
||||||
|
parent.updatePreemptedMemoryMBSeconds(mbSeconds);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
public void updatePreemptedVcoreSeconds(long vcoreSeconds) {
|
||||||
|
aggregateVcoreSecondsPreempted.incr(vcoreSeconds);
|
||||||
|
if (parent != null) {
|
||||||
|
parent.updatePreemptedVcoreSeconds(vcoreSeconds);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
public void reserveResource(String partition, String user, Resource res) {
|
public void reserveResource(String partition, String user, Resource res) {
|
||||||
if(partition == null || partition.equals(RMNodeLabelsManager.NO_LABEL)) {
|
if(partition == null || partition.equals(RMNodeLabelsManager.NO_LABEL)) {
|
||||||
reserveResource(user, res);
|
reserveResource(user, res);
|
||||||
|
|
|
@ -38,6 +38,7 @@ import java.util.concurrent.LinkedBlockingQueue;
|
||||||
import java.util.concurrent.atomic.AtomicBoolean;
|
import java.util.concurrent.atomic.AtomicBoolean;
|
||||||
|
|
||||||
import org.apache.commons.lang.StringUtils;
|
import org.apache.commons.lang.StringUtils;
|
||||||
|
import org.apache.commons.lang.time.DateUtils;
|
||||||
import org.apache.commons.logging.Log;
|
import org.apache.commons.logging.Log;
|
||||||
import org.apache.commons.logging.LogFactory;
|
import org.apache.commons.logging.LogFactory;
|
||||||
import org.apache.hadoop.classification.InterfaceAudience.LimitedPrivate;
|
import org.apache.hadoop.classification.InterfaceAudience.LimitedPrivate;
|
||||||
|
@ -51,6 +52,7 @@ import org.apache.hadoop.util.Time;
|
||||||
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
|
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
|
||||||
import org.apache.hadoop.yarn.api.records.ApplicationId;
|
import org.apache.hadoop.yarn.api.records.ApplicationId;
|
||||||
import org.apache.hadoop.yarn.api.records.Container;
|
import org.apache.hadoop.yarn.api.records.Container;
|
||||||
|
import org.apache.hadoop.yarn.api.records.ContainerExitStatus;
|
||||||
import org.apache.hadoop.yarn.api.records.ContainerId;
|
import org.apache.hadoop.yarn.api.records.ContainerId;
|
||||||
import org.apache.hadoop.yarn.api.records.ContainerStatus;
|
import org.apache.hadoop.yarn.api.records.ContainerStatus;
|
||||||
import org.apache.hadoop.yarn.api.records.NodeId;
|
import org.apache.hadoop.yarn.api.records.NodeId;
|
||||||
|
@ -1711,6 +1713,23 @@ public class CapacityScheduler extends
|
||||||
LeafQueue queue = (LeafQueue) application.getQueue();
|
LeafQueue queue = (LeafQueue) application.getQueue();
|
||||||
queue.completedContainer(getClusterResource(), application, node,
|
queue.completedContainer(getClusterResource(), application, node,
|
||||||
rmContainer, containerStatus, event, null, true);
|
rmContainer, containerStatus, event, null, true);
|
||||||
|
if (ContainerExitStatus.PREEMPTED == containerStatus.getExitStatus()) {
|
||||||
|
updateQueuePreemptionMetrics(queue, rmContainer);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private void updateQueuePreemptionMetrics(
|
||||||
|
CSQueue queue, RMContainer rmc) {
|
||||||
|
QueueMetrics qMetrics = queue.getMetrics();
|
||||||
|
long usedMillis = rmc.getFinishTime() - rmc.getCreationTime();
|
||||||
|
Resource containerResource = rmc.getAllocatedResource();
|
||||||
|
qMetrics.preemptContainer();
|
||||||
|
long mbSeconds = (containerResource.getMemorySize() * usedMillis)
|
||||||
|
/ DateUtils.MILLIS_PER_SECOND;
|
||||||
|
long vcSeconds = (containerResource.getVirtualCores() * usedMillis)
|
||||||
|
/ DateUtils.MILLIS_PER_SECOND;
|
||||||
|
qMetrics.updatePreemptedMemoryMBSeconds(mbSeconds);
|
||||||
|
qMetrics.updatePreemptedVcoreSeconds(vcSeconds);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Lock(Lock.NoLock.class)
|
@Lock(Lock.NoLock.class)
|
||||||
|
|
|
@ -152,6 +152,10 @@ public class TestCapacitySchedulerSurgicalPreemption
|
||||||
waitNumberOfLiveContainersOnNodeFromApp(cs.getNode(rmNode2.getNodeID()),
|
waitNumberOfLiveContainersOnNodeFromApp(cs.getNode(rmNode2.getNodeID()),
|
||||||
am1.getApplicationAttemptId(), 16);
|
am1.getApplicationAttemptId(), 16);
|
||||||
|
|
||||||
|
// Ensure preemption metrics were recored.
|
||||||
|
Assert.assertEquals("Number of preempted containers incorrectly recorded:",
|
||||||
|
4, cs.getQueue("root").getMetrics().getAggregatePreemptedContainers());
|
||||||
|
|
||||||
rm1.close();
|
rm1.close();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue