YARN-598. Add virtual cores to queue metrics. (sandyr via tucu)

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1480816 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Alejandro Abdelnur 2013-05-09 22:23:50 +00:00
parent 51ccb87031
commit f19465f72f
3 changed files with 76 additions and 42 deletions

View File

@ -238,6 +238,8 @@ Release 2.0.5-beta - UNRELEASED
YARN-568. Add support for work preserving preemption to the FairScheduler. YARN-568. Add support for work preserving preemption to the FairScheduler.
(Carlo Curino and Sandy Ryza via cdouglas) (Carlo Curino and Sandy Ryza via cdouglas)
YARN-598. Add virtual cores to queue metrics. (sandyr via tucu)
OPTIMIZATIONS OPTIMIZATIONS
BUG FIXES BUG FIXES

View File

@ -60,13 +60,17 @@ public class QueueMetrics implements MetricsSource {
@Metric("# of apps failed") MutableGaugeInt appsFailed; @Metric("# of apps failed") MutableGaugeInt appsFailed;
@Metric("Allocated memory in MB") MutableGaugeInt allocatedMB; @Metric("Allocated memory in MB") MutableGaugeInt allocatedMB;
@Metric("Allocated CPU in virtual cores") MutableGaugeInt allocatedVCores;
@Metric("# of allocated containers") MutableGaugeInt allocatedContainers; @Metric("# of allocated containers") MutableGaugeInt allocatedContainers;
@Metric("Aggregate # of allocated containers") MutableCounterLong aggregateContainersAllocated; @Metric("Aggregate # of allocated containers") MutableCounterLong aggregateContainersAllocated;
@Metric("Aggregate # of released containers") MutableCounterLong aggregateContainersReleased; @Metric("Aggregate # of released containers") MutableCounterLong aggregateContainersReleased;
@Metric("Available memory in MB") MutableGaugeInt availableMB; @Metric("Available memory in MB") MutableGaugeInt availableMB;
@Metric("Available CPU in virtual cores") MutableGaugeInt availableVCores;
@Metric("Pending memory allocation in MB") MutableGaugeInt pendingMB; @Metric("Pending memory allocation in MB") MutableGaugeInt pendingMB;
@Metric("Pending CPU allocation in virtual cores") MutableGaugeInt pendingVCores;
@Metric("# of pending containers") MutableGaugeInt pendingContainers; @Metric("# of pending containers") MutableGaugeInt pendingContainers;
@Metric("# of reserved memory in MB") MutableGaugeInt reservedMB; @Metric("# of reserved memory in MB") MutableGaugeInt reservedMB;
@Metric("Reserved CPU in virtual cores") MutableGaugeInt reservedVCores;
@Metric("# of reserved containers") MutableGaugeInt reservedContainers; @Metric("# of reserved containers") MutableGaugeInt reservedContainers;
@Metric("# of active users") MutableGaugeInt activeUsers; @Metric("# of active users") MutableGaugeInt activeUsers;
@Metric("# of active users") MutableGaugeInt activeApplications; @Metric("# of active users") MutableGaugeInt activeApplications;
@ -268,6 +272,7 @@ public class QueueMetrics implements MetricsSource {
*/ */
public void setAvailableResourcesToQueue(Resource limit) { public void setAvailableResourcesToQueue(Resource limit) {
availableMB.set(limit.getMemory()); availableMB.set(limit.getMemory());
availableVCores.set(limit.getVirtualCores());
} }
/** /**
@ -304,6 +309,7 @@ public class QueueMetrics implements MetricsSource {
private void _incrPendingResources(int containers, Resource res) { private void _incrPendingResources(int containers, Resource res) {
pendingContainers.incr(containers); pendingContainers.incr(containers);
pendingMB.incr(res.getMemory()); pendingMB.incr(res.getMemory());
pendingVCores.incr(res.getVirtualCores());
} }
public void decrPendingResources(String user, int containers, Resource res) { public void decrPendingResources(String user, int containers, Resource res) {
@ -320,12 +326,14 @@ public class QueueMetrics implements MetricsSource {
private void _decrPendingResources(int containers, Resource res) { private void _decrPendingResources(int containers, Resource res) {
pendingContainers.decr(containers); pendingContainers.decr(containers);
pendingMB.decr(res.getMemory()); pendingMB.decr(res.getMemory());
pendingVCores.decr(res.getVirtualCores());
} }
public void allocateResources(String user, int containers, Resource res) { public void allocateResources(String user, int containers, Resource res) {
allocatedContainers.incr(containers); allocatedContainers.incr(containers);
aggregateContainersAllocated.incr(containers); aggregateContainersAllocated.incr(containers);
allocatedMB.incr(res.getMemory() * containers); allocatedMB.incr(res.getMemory() * containers);
allocatedVCores.incr(res.getVirtualCores() * containers);
_decrPendingResources(containers, Resources.multiply(res, containers)); _decrPendingResources(containers, Resources.multiply(res, containers));
QueueMetrics userMetrics = getUserMetrics(user); QueueMetrics userMetrics = getUserMetrics(user);
if (userMetrics != null) { if (userMetrics != null) {
@ -340,6 +348,7 @@ public class QueueMetrics implements MetricsSource {
allocatedContainers.decr(containers); allocatedContainers.decr(containers);
aggregateContainersReleased.incr(containers); aggregateContainersReleased.incr(containers);
allocatedMB.decr(res.getMemory() * containers); allocatedMB.decr(res.getMemory() * containers);
allocatedVCores.decr(res.getVirtualCores() * containers);
QueueMetrics userMetrics = getUserMetrics(user); QueueMetrics userMetrics = getUserMetrics(user);
if (userMetrics != null) { if (userMetrics != null) {
userMetrics.releaseResources(user, containers, res); userMetrics.releaseResources(user, containers, res);
@ -352,6 +361,7 @@ public class QueueMetrics implements MetricsSource {
public void reserveResource(String user, Resource res) { public void reserveResource(String user, Resource res) {
reservedContainers.incr(); reservedContainers.incr();
reservedMB.incr(res.getMemory()); reservedMB.incr(res.getMemory());
reservedVCores.incr(res.getVirtualCores());
QueueMetrics userMetrics = getUserMetrics(user); QueueMetrics userMetrics = getUserMetrics(user);
if (userMetrics != null) { if (userMetrics != null) {
userMetrics.reserveResource(user, res); userMetrics.reserveResource(user, res);
@ -364,6 +374,7 @@ public class QueueMetrics implements MetricsSource {
public void unreserveResource(String user, Resource res) { public void unreserveResource(String user, Resource res) {
reservedContainers.decr(); reservedContainers.decr();
reservedMB.decr(res.getMemory()); reservedMB.decr(res.getMemory());
reservedVCores.decr(res.getVirtualCores());
QueueMetrics userMetrics = getUserMetrics(user); QueueMetrics userMetrics = getUserMetrics(user);
if (userMetrics != null) { if (userMetrics != null) {
userMetrics.unreserveResource(user, res); userMetrics.unreserveResource(user, res);
@ -434,6 +445,10 @@ public class QueueMetrics implements MetricsSource {
public int getAllocatedMB() { public int getAllocatedMB() {
return allocatedMB.value(); return allocatedMB.value();
} }
public int getAllocatedVirtualCores() {
return allocatedVCores.value();
}
public int getAllocatedContainers() { public int getAllocatedContainers() {
return allocatedContainers.value(); return allocatedContainers.value();
@ -442,10 +457,18 @@ public class QueueMetrics implements MetricsSource {
public int getAvailableMB() { public int getAvailableMB() {
return availableMB.value(); return availableMB.value();
} }
public int getAvailableVirtualCores() {
return availableVCores.value();
}
public int getPendingMB() { public int getPendingMB() {
return pendingMB.value(); return pendingMB.value();
} }
public int getPendingVirtualCores() {
return pendingVCores.value();
}
public int getPendingContainers() { public int getPendingContainers() {
return pendingContainers.value(); return pendingContainers.value();
@ -454,6 +477,10 @@ public class QueueMetrics implements MetricsSource {
public int getReservedMB() { public int getReservedMB() {
return reservedMB.value(); return reservedMB.value();
} }
public int getReservedVirtualCores() {
return reservedVCores.value();
}
public int getReservedContainers() { public int getReservedContainers() {
return reservedContainers.value(); return reservedContainers.value();

View File

@ -66,20 +66,20 @@ public class TestQueueMetrics {
MetricsSource userSource = userSource(ms, queueName, user); MetricsSource userSource = userSource(ms, queueName, user);
checkApps(queueSource, 1, 1, 0, 0, 0, 0); checkApps(queueSource, 1, 1, 0, 0, 0, 0);
metrics.setAvailableResourcesToQueue(Resources.createResource(100*GB)); metrics.setAvailableResourcesToQueue(Resources.createResource(100*GB, 100));
metrics.incrPendingResources(user, 5, Resources.createResource(15*GB)); metrics.incrPendingResources(user, 5, Resources.createResource(15*GB, 15));
// Available resources is set externally, as it depends on dynamic // Available resources is set externally, as it depends on dynamic
// configurable cluster/queue resources // configurable cluster/queue resources
checkResources(queueSource, 0, 0, 0, 0, 100*GB, 15*GB, 5, 0, 0); checkResources(queueSource, 0, 0, 0, 0, 0, 100*GB, 100, 15*GB, 15, 5, 0, 0, 0);
metrics.incrAppsRunning(app, user); metrics.incrAppsRunning(app, user);
checkApps(queueSource, 1, 0, 1, 0, 0, 0); checkApps(queueSource, 1, 0, 1, 0, 0, 0);
metrics.allocateResources(user, 3, Resources.createResource(2*GB)); metrics.allocateResources(user, 3, Resources.createResource(2*GB, 2));
checkResources(queueSource, 6*GB, 3, 3, 0, 100*GB, 9*GB, 2, 0, 0); checkResources(queueSource, 6*GB, 6, 3, 3, 0, 100*GB, 100, 9*GB, 9, 2, 0, 0, 0);
metrics.releaseResources(user, 1, Resources.createResource(2*GB)); metrics.releaseResources(user, 1, Resources.createResource(2*GB, 2));
checkResources(queueSource, 4*GB, 2, 3, 1, 100*GB, 9*GB, 2, 0, 0); checkResources(queueSource, 4*GB, 4, 2, 3, 1, 100*GB, 100, 9*GB, 9, 2, 0, 0, 0);
metrics.finishApp(app, RMAppAttemptState.FINISHED); metrics.finishApp(app, RMAppAttemptState.FINISHED);
checkApps(queueSource, 1, 0, 0, 1, 0, 0); checkApps(queueSource, 1, 0, 0, 1, 0, 0);
@ -148,25 +148,25 @@ public class TestQueueMetrics {
checkApps(queueSource, 1, 1, 0, 0, 0, 0); checkApps(queueSource, 1, 1, 0, 0, 0, 0);
checkApps(userSource, 1, 1, 0, 0, 0, 0); checkApps(userSource, 1, 1, 0, 0, 0, 0);
metrics.setAvailableResourcesToQueue(Resources.createResource(100*GB)); metrics.setAvailableResourcesToQueue(Resources.createResource(100*GB, 100));
metrics.setAvailableResourcesToUser(user, Resources.createResource(10*GB)); metrics.setAvailableResourcesToUser(user, Resources.createResource(10*GB, 10));
metrics.incrPendingResources(user, 5, Resources.createResource(15*GB)); metrics.incrPendingResources(user, 5, Resources.createResource(15*GB, 15));
// Available resources is set externally, as it depends on dynamic // Available resources is set externally, as it depends on dynamic
// configurable cluster/queue resources // configurable cluster/queue resources
checkResources(queueSource, 0, 0, 0, 0, 100*GB, 15*GB, 5, 0, 0); checkResources(queueSource, 0, 0, 0, 0, 0, 100*GB, 100, 15*GB, 15, 5, 0, 0, 0);
checkResources(userSource, 0, 0, 0, 0, 10*GB, 15*GB, 5, 0, 0); checkResources(userSource, 0, 0, 0, 0, 0, 10*GB, 10, 15*GB, 15, 5, 0, 0, 0);
metrics.incrAppsRunning(app, user); metrics.incrAppsRunning(app, user);
checkApps(queueSource, 1, 0, 1, 0, 0, 0); checkApps(queueSource, 1, 0, 1, 0, 0, 0);
checkApps(userSource, 1, 0, 1, 0, 0, 0); checkApps(userSource, 1, 0, 1, 0, 0, 0);
metrics.allocateResources(user, 3, Resources.createResource(2*GB)); metrics.allocateResources(user, 3, Resources.createResource(2*GB, 2));
checkResources(queueSource, 6*GB, 3, 3, 0, 100*GB, 9*GB, 2, 0, 0); checkResources(queueSource, 6*GB, 6, 3, 3, 0, 100*GB, 100, 9*GB, 9, 2, 0, 0, 0);
checkResources(userSource, 6*GB, 3, 3, 0, 10*GB, 9*GB, 2, 0, 0); checkResources(userSource, 6*GB, 6, 3, 3, 0, 10*GB, 10, 9*GB, 9, 2, 0, 0, 0);
metrics.releaseResources(user, 1, Resources.createResource(2*GB)); metrics.releaseResources(user, 1, Resources.createResource(2*GB, 2));
checkResources(queueSource, 4*GB, 2, 3, 1, 100*GB, 9*GB, 2, 0, 0); checkResources(queueSource, 4*GB, 4, 2, 3, 1, 100*GB, 100, 9*GB, 9, 2, 0, 0, 0);
checkResources(userSource, 4*GB, 2, 3, 1, 10*GB, 9*GB, 2, 0, 0); checkResources(userSource, 4*GB, 4, 2, 3, 1, 10*GB, 10, 9*GB, 9, 2, 0, 0, 0);
metrics.finishApp(app, RMAppAttemptState.FINISHED); metrics.finishApp(app, RMAppAttemptState.FINISHED);
checkApps(queueSource, 1, 0, 0, 1, 0, 0); checkApps(queueSource, 1, 0, 0, 1, 0, 0);
@ -197,35 +197,35 @@ public class TestQueueMetrics {
checkApps(userSource, 1, 1, 0, 0, 0, 0); checkApps(userSource, 1, 1, 0, 0, 0, 0);
checkApps(parentUserSource, 1, 1, 0, 0, 0, 0); checkApps(parentUserSource, 1, 1, 0, 0, 0, 0);
parentMetrics.setAvailableResourcesToQueue(Resources.createResource(100*GB)); parentMetrics.setAvailableResourcesToQueue(Resources.createResource(100*GB, 100));
metrics.setAvailableResourcesToQueue(Resources.createResource(100*GB)); metrics.setAvailableResourcesToQueue(Resources.createResource(100*GB, 100));
parentMetrics.setAvailableResourcesToUser(user, Resources.createResource(10*GB)); parentMetrics.setAvailableResourcesToUser(user, Resources.createResource(10*GB, 10));
metrics.setAvailableResourcesToUser(user, Resources.createResource(10*GB)); metrics.setAvailableResourcesToUser(user, Resources.createResource(10*GB, 10));
metrics.incrPendingResources(user, 5, Resources.createResource(15*GB)); metrics.incrPendingResources(user, 5, Resources.createResource(15*GB, 15));
checkResources(queueSource, 0, 0, 0, 0, 100*GB, 15*GB, 5, 0, 0); checkResources(queueSource, 0, 0, 0, 0, 0, 100*GB, 100, 15*GB, 15, 5, 0, 0, 0);
checkResources(parentQueueSource, 0, 0, 0, 0, 100*GB, 15*GB, 5, 0, 0); checkResources(parentQueueSource, 0, 0, 0, 0, 0, 100*GB, 100, 15*GB, 15, 5, 0, 0, 0);
checkResources(userSource, 0, 0, 0, 0, 10*GB, 15*GB, 5, 0, 0); checkResources(userSource, 0, 0, 0, 0, 0, 10*GB, 10, 15*GB, 15, 5, 0, 0, 0);
checkResources(parentUserSource, 0, 0, 0, 0, 10*GB, 15*GB, 5, 0, 0); checkResources(parentUserSource, 0, 0, 0, 0, 0, 10*GB, 10, 15*GB, 15, 5, 0, 0, 0);
metrics.incrAppsRunning(app, user); metrics.incrAppsRunning(app, user);
checkApps(queueSource, 1, 0, 1, 0, 0, 0); checkApps(queueSource, 1, 0, 1, 0, 0, 0);
checkApps(userSource, 1, 0, 1, 0, 0, 0); checkApps(userSource, 1, 0, 1, 0, 0, 0);
metrics.allocateResources(user, 3, Resources.createResource(2*GB)); metrics.allocateResources(user, 3, Resources.createResource(2*GB, 2));
metrics.reserveResource(user, Resources.createResource(3*GB)); metrics.reserveResource(user, Resources.createResource(3*GB, 3));
// Available resources is set externally, as it depends on dynamic // Available resources is set externally, as it depends on dynamic
// configurable cluster/queue resources // configurable cluster/queue resources
checkResources(queueSource, 6*GB, 3, 3, 0, 100*GB, 9*GB, 2, 3*GB, 1); checkResources(queueSource, 6*GB, 6, 3, 3, 0, 100*GB, 100, 9*GB, 9, 2, 3*GB, 3, 1);
checkResources(parentQueueSource, 6*GB, 3, 3, 0, 100*GB, 9*GB, 2, 3*GB, 1); checkResources(parentQueueSource, 6*GB, 6, 3, 3, 0, 100*GB, 100, 9*GB, 9, 2, 3*GB, 3, 1);
checkResources(userSource, 6*GB, 3, 3, 0, 10*GB, 9*GB, 2, 3*GB, 1); checkResources(userSource, 6*GB, 6, 3, 3, 0, 10*GB, 10, 9*GB, 9, 2, 3*GB, 3, 1);
checkResources(parentUserSource, 6*GB, 3, 3, 0, 10*GB, 9*GB, 2, 3*GB, 1); checkResources(parentUserSource, 6*GB, 6, 3, 3, 0, 10*GB, 10, 9*GB, 9, 2, 3*GB, 3, 1);
metrics.releaseResources(user, 1, Resources.createResource(2*GB)); metrics.releaseResources(user, 1, Resources.createResource(2*GB, 2));
metrics.unreserveResource(user, Resources.createResource(3*GB)); metrics.unreserveResource(user, Resources.createResource(3*GB, 3));
checkResources(queueSource, 4*GB, 2, 3, 1, 100*GB, 9*GB, 2, 0, 0); checkResources(queueSource, 4*GB, 4, 2, 3, 1, 100*GB, 100, 9*GB, 9, 2, 0, 0, 0);
checkResources(parentQueueSource, 4*GB, 2, 3, 1, 100*GB, 9*GB, 2, 0, 0); checkResources(parentQueueSource, 4*GB, 4, 2, 3, 1, 100*GB, 100, 9*GB, 9, 2, 0, 0, 0);
checkResources(userSource, 4*GB, 2, 3, 1, 10*GB, 9*GB, 2, 0, 0); checkResources(userSource, 4*GB, 4, 2, 3, 1, 10*GB, 10, 9*GB, 9, 2, 0, 0, 0);
checkResources(parentUserSource, 4*GB, 2, 3, 1, 10*GB, 9*GB, 2, 0, 0); checkResources(parentUserSource, 4*GB, 4, 2, 3, 1, 10*GB, 10, 9*GB, 9, 2, 0, 0, 0);
metrics.finishApp(app, RMAppAttemptState.FINISHED); metrics.finishApp(app, RMAppAttemptState.FINISHED);
checkApps(queueSource, 1, 0, 0, 1, 0, 0); checkApps(queueSource, 1, 0, 0, 1, 0, 0);
@ -277,18 +277,23 @@ public class TestQueueMetrics {
} }
public static void checkResources(MetricsSource source, int allocatedMB, public static void checkResources(MetricsSource source, int allocatedMB,
int allocCtnrs, long aggreAllocCtnrs, long aggreReleasedCtnrs, int allocatedCores, int allocCtnrs, long aggreAllocCtnrs,
int availableMB, int pendingMB, int pendingCtnrs, long aggreReleasedCtnrs, int availableMB, int availableCores, int pendingMB,
int reservedMB, int reservedCtnrs) { int pendingCores, int pendingCtnrs, int reservedMB, int reservedCores,
int reservedCtnrs) {
MetricsRecordBuilder rb = getMetrics(source); MetricsRecordBuilder rb = getMetrics(source);
assertGauge("AllocatedMB", allocatedMB, rb); assertGauge("AllocatedMB", allocatedMB, rb);
assertGauge("AllocatedVCores", allocatedCores, rb);
assertGauge("AllocatedContainers", allocCtnrs, rb); assertGauge("AllocatedContainers", allocCtnrs, rb);
assertCounter("AggregateContainersAllocated", aggreAllocCtnrs, rb); assertCounter("AggregateContainersAllocated", aggreAllocCtnrs, rb);
assertCounter("AggregateContainersReleased", aggreReleasedCtnrs, rb); assertCounter("AggregateContainersReleased", aggreReleasedCtnrs, rb);
assertGauge("AvailableMB", availableMB, rb); assertGauge("AvailableMB", availableMB, rb);
assertGauge("AvailableVCores", availableCores, rb);
assertGauge("PendingMB", pendingMB, rb); assertGauge("PendingMB", pendingMB, rb);
assertGauge("PendingVCores", pendingCores, rb);
assertGauge("PendingContainers", pendingCtnrs, rb); assertGauge("PendingContainers", pendingCtnrs, rb);
assertGauge("ReservedMB", reservedMB, rb); assertGauge("ReservedMB", reservedMB, rb);
assertGauge("ReservedVCores", reservedCores, rb);
assertGauge("ReservedContainers", reservedCtnrs, rb); assertGauge("ReservedContainers", reservedCtnrs, rb);
} }