YARN-598. Add virtual cores to queue metrics. (sandyr via tucu)

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/branch-2@1480817 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Alejandro Abdelnur 2013-05-09 22:24:29 +00:00
parent de8b9c94a4
commit 98f63813b4
3 changed files with 76 additions and 42 deletions

View File

@ -160,6 +160,8 @@ Release 2.0.5-beta - UNRELEASED
tokens for app attempt so that RM can be restarted while preserving current tokens for app attempt so that RM can be restarted while preserving current
applications. (Jian He via vinodkv) applications. (Jian He via vinodkv)
YARN-598. Add virtual cores to queue metrics. (sandyr via tucu)
OPTIMIZATIONS OPTIMIZATIONS
BUG FIXES BUG FIXES

View File

@ -60,13 +60,17 @@ public class QueueMetrics implements MetricsSource {
@Metric("# of apps failed") MutableGaugeInt appsFailed; @Metric("# of apps failed") MutableGaugeInt appsFailed;
@Metric("Allocated memory in MB") MutableGaugeInt allocatedMB; @Metric("Allocated memory in MB") MutableGaugeInt allocatedMB;
@Metric("Allocated CPU in virtual cores") MutableGaugeInt allocatedVCores;
@Metric("# of allocated containers") MutableGaugeInt allocatedContainers; @Metric("# of allocated containers") MutableGaugeInt allocatedContainers;
@Metric("Aggregate # of allocated containers") MutableCounterLong aggregateContainersAllocated; @Metric("Aggregate # of allocated containers") MutableCounterLong aggregateContainersAllocated;
@Metric("Aggregate # of released containers") MutableCounterLong aggregateContainersReleased; @Metric("Aggregate # of released containers") MutableCounterLong aggregateContainersReleased;
@Metric("Available memory in MB") MutableGaugeInt availableMB; @Metric("Available memory in MB") MutableGaugeInt availableMB;
@Metric("Available CPU in virtual cores") MutableGaugeInt availableVCores;
@Metric("Pending memory allocation in MB") MutableGaugeInt pendingMB; @Metric("Pending memory allocation in MB") MutableGaugeInt pendingMB;
@Metric("Pending CPU allocation in virtual cores") MutableGaugeInt pendingVCores;
@Metric("# of pending containers") MutableGaugeInt pendingContainers; @Metric("# of pending containers") MutableGaugeInt pendingContainers;
@Metric("# of reserved memory in MB") MutableGaugeInt reservedMB; @Metric("# of reserved memory in MB") MutableGaugeInt reservedMB;
@Metric("Reserved CPU in virtual cores") MutableGaugeInt reservedVCores;
@Metric("# of reserved containers") MutableGaugeInt reservedContainers; @Metric("# of reserved containers") MutableGaugeInt reservedContainers;
@Metric("# of active users") MutableGaugeInt activeUsers; @Metric("# of active users") MutableGaugeInt activeUsers;
@Metric("# of active users") MutableGaugeInt activeApplications; @Metric("# of active users") MutableGaugeInt activeApplications;
@ -268,6 +272,7 @@ public class QueueMetrics implements MetricsSource {
*/ */
public void setAvailableResourcesToQueue(Resource limit) { public void setAvailableResourcesToQueue(Resource limit) {
availableMB.set(limit.getMemory()); availableMB.set(limit.getMemory());
availableVCores.set(limit.getVirtualCores());
} }
/** /**
@ -304,6 +309,7 @@ public class QueueMetrics implements MetricsSource {
private void _incrPendingResources(int containers, Resource res) { private void _incrPendingResources(int containers, Resource res) {
pendingContainers.incr(containers); pendingContainers.incr(containers);
pendingMB.incr(res.getMemory()); pendingMB.incr(res.getMemory());
pendingVCores.incr(res.getVirtualCores());
} }
public void decrPendingResources(String user, int containers, Resource res) { public void decrPendingResources(String user, int containers, Resource res) {
@ -320,12 +326,14 @@ public class QueueMetrics implements MetricsSource {
private void _decrPendingResources(int containers, Resource res) { private void _decrPendingResources(int containers, Resource res) {
pendingContainers.decr(containers); pendingContainers.decr(containers);
pendingMB.decr(res.getMemory()); pendingMB.decr(res.getMemory());
pendingVCores.decr(res.getVirtualCores());
} }
public void allocateResources(String user, int containers, Resource res) { public void allocateResources(String user, int containers, Resource res) {
allocatedContainers.incr(containers); allocatedContainers.incr(containers);
aggregateContainersAllocated.incr(containers); aggregateContainersAllocated.incr(containers);
allocatedMB.incr(res.getMemory() * containers); allocatedMB.incr(res.getMemory() * containers);
allocatedVCores.incr(res.getVirtualCores() * containers);
_decrPendingResources(containers, Resources.multiply(res, containers)); _decrPendingResources(containers, Resources.multiply(res, containers));
QueueMetrics userMetrics = getUserMetrics(user); QueueMetrics userMetrics = getUserMetrics(user);
if (userMetrics != null) { if (userMetrics != null) {
@ -340,6 +348,7 @@ public class QueueMetrics implements MetricsSource {
allocatedContainers.decr(containers); allocatedContainers.decr(containers);
aggregateContainersReleased.incr(containers); aggregateContainersReleased.incr(containers);
allocatedMB.decr(res.getMemory() * containers); allocatedMB.decr(res.getMemory() * containers);
allocatedVCores.decr(res.getVirtualCores() * containers);
QueueMetrics userMetrics = getUserMetrics(user); QueueMetrics userMetrics = getUserMetrics(user);
if (userMetrics != null) { if (userMetrics != null) {
userMetrics.releaseResources(user, containers, res); userMetrics.releaseResources(user, containers, res);
@ -352,6 +361,7 @@ public class QueueMetrics implements MetricsSource {
public void reserveResource(String user, Resource res) { public void reserveResource(String user, Resource res) {
reservedContainers.incr(); reservedContainers.incr();
reservedMB.incr(res.getMemory()); reservedMB.incr(res.getMemory());
reservedVCores.incr(res.getVirtualCores());
QueueMetrics userMetrics = getUserMetrics(user); QueueMetrics userMetrics = getUserMetrics(user);
if (userMetrics != null) { if (userMetrics != null) {
userMetrics.reserveResource(user, res); userMetrics.reserveResource(user, res);
@ -364,6 +374,7 @@ public class QueueMetrics implements MetricsSource {
public void unreserveResource(String user, Resource res) { public void unreserveResource(String user, Resource res) {
reservedContainers.decr(); reservedContainers.decr();
reservedMB.decr(res.getMemory()); reservedMB.decr(res.getMemory());
reservedVCores.decr(res.getVirtualCores());
QueueMetrics userMetrics = getUserMetrics(user); QueueMetrics userMetrics = getUserMetrics(user);
if (userMetrics != null) { if (userMetrics != null) {
userMetrics.unreserveResource(user, res); userMetrics.unreserveResource(user, res);
@ -435,6 +446,10 @@ public class QueueMetrics implements MetricsSource {
return allocatedMB.value(); return allocatedMB.value();
} }
public int getAllocatedVirtualCores() {
return allocatedVCores.value();
}
public int getAllocatedContainers() { public int getAllocatedContainers() {
return allocatedContainers.value(); return allocatedContainers.value();
} }
@ -443,10 +458,18 @@ public class QueueMetrics implements MetricsSource {
return availableMB.value(); return availableMB.value();
} }
public int getAvailableVirtualCores() {
return availableVCores.value();
}
public int getPendingMB() { public int getPendingMB() {
return pendingMB.value(); return pendingMB.value();
} }
public int getPendingVirtualCores() {
return pendingVCores.value();
}
public int getPendingContainers() { public int getPendingContainers() {
return pendingContainers.value(); return pendingContainers.value();
} }
@ -455,6 +478,10 @@ public class QueueMetrics implements MetricsSource {
return reservedMB.value(); return reservedMB.value();
} }
public int getReservedVirtualCores() {
return reservedVCores.value();
}
public int getReservedContainers() { public int getReservedContainers() {
return reservedContainers.value(); return reservedContainers.value();
} }

View File

@ -66,20 +66,20 @@ public class TestQueueMetrics {
MetricsSource userSource = userSource(ms, queueName, user); MetricsSource userSource = userSource(ms, queueName, user);
checkApps(queueSource, 1, 1, 0, 0, 0, 0); checkApps(queueSource, 1, 1, 0, 0, 0, 0);
metrics.setAvailableResourcesToQueue(Resources.createResource(100*GB)); metrics.setAvailableResourcesToQueue(Resources.createResource(100*GB, 100));
metrics.incrPendingResources(user, 5, Resources.createResource(15*GB)); metrics.incrPendingResources(user, 5, Resources.createResource(15*GB, 15));
// Available resources is set externally, as it depends on dynamic // Available resources is set externally, as it depends on dynamic
// configurable cluster/queue resources // configurable cluster/queue resources
checkResources(queueSource, 0, 0, 0, 0, 100*GB, 15*GB, 5, 0, 0); checkResources(queueSource, 0, 0, 0, 0, 0, 100*GB, 100, 15*GB, 15, 5, 0, 0, 0);
metrics.incrAppsRunning(app, user); metrics.incrAppsRunning(app, user);
checkApps(queueSource, 1, 0, 1, 0, 0, 0); checkApps(queueSource, 1, 0, 1, 0, 0, 0);
metrics.allocateResources(user, 3, Resources.createResource(2*GB)); metrics.allocateResources(user, 3, Resources.createResource(2*GB, 2));
checkResources(queueSource, 6*GB, 3, 3, 0, 100*GB, 9*GB, 2, 0, 0); checkResources(queueSource, 6*GB, 6, 3, 3, 0, 100*GB, 100, 9*GB, 9, 2, 0, 0, 0);
metrics.releaseResources(user, 1, Resources.createResource(2*GB)); metrics.releaseResources(user, 1, Resources.createResource(2*GB, 2));
checkResources(queueSource, 4*GB, 2, 3, 1, 100*GB, 9*GB, 2, 0, 0); checkResources(queueSource, 4*GB, 4, 2, 3, 1, 100*GB, 100, 9*GB, 9, 2, 0, 0, 0);
metrics.finishApp(app, RMAppAttemptState.FINISHED); metrics.finishApp(app, RMAppAttemptState.FINISHED);
checkApps(queueSource, 1, 0, 0, 1, 0, 0); checkApps(queueSource, 1, 0, 0, 1, 0, 0);
@ -148,25 +148,25 @@ public class TestQueueMetrics {
checkApps(queueSource, 1, 1, 0, 0, 0, 0); checkApps(queueSource, 1, 1, 0, 0, 0, 0);
checkApps(userSource, 1, 1, 0, 0, 0, 0); checkApps(userSource, 1, 1, 0, 0, 0, 0);
metrics.setAvailableResourcesToQueue(Resources.createResource(100*GB)); metrics.setAvailableResourcesToQueue(Resources.createResource(100*GB, 100));
metrics.setAvailableResourcesToUser(user, Resources.createResource(10*GB)); metrics.setAvailableResourcesToUser(user, Resources.createResource(10*GB, 10));
metrics.incrPendingResources(user, 5, Resources.createResource(15*GB)); metrics.incrPendingResources(user, 5, Resources.createResource(15*GB, 15));
// Available resources is set externally, as it depends on dynamic // Available resources is set externally, as it depends on dynamic
// configurable cluster/queue resources // configurable cluster/queue resources
checkResources(queueSource, 0, 0, 0, 0, 100*GB, 15*GB, 5, 0, 0); checkResources(queueSource, 0, 0, 0, 0, 0, 100*GB, 100, 15*GB, 15, 5, 0, 0, 0);
checkResources(userSource, 0, 0, 0, 0, 10*GB, 15*GB, 5, 0, 0); checkResources(userSource, 0, 0, 0, 0, 0, 10*GB, 10, 15*GB, 15, 5, 0, 0, 0);
metrics.incrAppsRunning(app, user); metrics.incrAppsRunning(app, user);
checkApps(queueSource, 1, 0, 1, 0, 0, 0); checkApps(queueSource, 1, 0, 1, 0, 0, 0);
checkApps(userSource, 1, 0, 1, 0, 0, 0); checkApps(userSource, 1, 0, 1, 0, 0, 0);
metrics.allocateResources(user, 3, Resources.createResource(2*GB)); metrics.allocateResources(user, 3, Resources.createResource(2*GB, 2));
checkResources(queueSource, 6*GB, 3, 3, 0, 100*GB, 9*GB, 2, 0, 0); checkResources(queueSource, 6*GB, 6, 3, 3, 0, 100*GB, 100, 9*GB, 9, 2, 0, 0, 0);
checkResources(userSource, 6*GB, 3, 3, 0, 10*GB, 9*GB, 2, 0, 0); checkResources(userSource, 6*GB, 6, 3, 3, 0, 10*GB, 10, 9*GB, 9, 2, 0, 0, 0);
metrics.releaseResources(user, 1, Resources.createResource(2*GB)); metrics.releaseResources(user, 1, Resources.createResource(2*GB, 2));
checkResources(queueSource, 4*GB, 2, 3, 1, 100*GB, 9*GB, 2, 0, 0); checkResources(queueSource, 4*GB, 4, 2, 3, 1, 100*GB, 100, 9*GB, 9, 2, 0, 0, 0);
checkResources(userSource, 4*GB, 2, 3, 1, 10*GB, 9*GB, 2, 0, 0); checkResources(userSource, 4*GB, 4, 2, 3, 1, 10*GB, 10, 9*GB, 9, 2, 0, 0, 0);
metrics.finishApp(app, RMAppAttemptState.FINISHED); metrics.finishApp(app, RMAppAttemptState.FINISHED);
checkApps(queueSource, 1, 0, 0, 1, 0, 0); checkApps(queueSource, 1, 0, 0, 1, 0, 0);
@ -197,35 +197,35 @@ public class TestQueueMetrics {
checkApps(userSource, 1, 1, 0, 0, 0, 0); checkApps(userSource, 1, 1, 0, 0, 0, 0);
checkApps(parentUserSource, 1, 1, 0, 0, 0, 0); checkApps(parentUserSource, 1, 1, 0, 0, 0, 0);
parentMetrics.setAvailableResourcesToQueue(Resources.createResource(100*GB)); parentMetrics.setAvailableResourcesToQueue(Resources.createResource(100*GB, 100));
metrics.setAvailableResourcesToQueue(Resources.createResource(100*GB)); metrics.setAvailableResourcesToQueue(Resources.createResource(100*GB, 100));
parentMetrics.setAvailableResourcesToUser(user, Resources.createResource(10*GB)); parentMetrics.setAvailableResourcesToUser(user, Resources.createResource(10*GB, 10));
metrics.setAvailableResourcesToUser(user, Resources.createResource(10*GB)); metrics.setAvailableResourcesToUser(user, Resources.createResource(10*GB, 10));
metrics.incrPendingResources(user, 5, Resources.createResource(15*GB)); metrics.incrPendingResources(user, 5, Resources.createResource(15*GB, 15));
checkResources(queueSource, 0, 0, 0, 0, 100*GB, 15*GB, 5, 0, 0); checkResources(queueSource, 0, 0, 0, 0, 0, 100*GB, 100, 15*GB, 15, 5, 0, 0, 0);
checkResources(parentQueueSource, 0, 0, 0, 0, 100*GB, 15*GB, 5, 0, 0); checkResources(parentQueueSource, 0, 0, 0, 0, 0, 100*GB, 100, 15*GB, 15, 5, 0, 0, 0);
checkResources(userSource, 0, 0, 0, 0, 10*GB, 15*GB, 5, 0, 0); checkResources(userSource, 0, 0, 0, 0, 0, 10*GB, 10, 15*GB, 15, 5, 0, 0, 0);
checkResources(parentUserSource, 0, 0, 0, 0, 10*GB, 15*GB, 5, 0, 0); checkResources(parentUserSource, 0, 0, 0, 0, 0, 10*GB, 10, 15*GB, 15, 5, 0, 0, 0);
metrics.incrAppsRunning(app, user); metrics.incrAppsRunning(app, user);
checkApps(queueSource, 1, 0, 1, 0, 0, 0); checkApps(queueSource, 1, 0, 1, 0, 0, 0);
checkApps(userSource, 1, 0, 1, 0, 0, 0); checkApps(userSource, 1, 0, 1, 0, 0, 0);
metrics.allocateResources(user, 3, Resources.createResource(2*GB)); metrics.allocateResources(user, 3, Resources.createResource(2*GB, 2));
metrics.reserveResource(user, Resources.createResource(3*GB)); metrics.reserveResource(user, Resources.createResource(3*GB, 3));
// Available resources is set externally, as it depends on dynamic // Available resources is set externally, as it depends on dynamic
// configurable cluster/queue resources // configurable cluster/queue resources
checkResources(queueSource, 6*GB, 3, 3, 0, 100*GB, 9*GB, 2, 3*GB, 1); checkResources(queueSource, 6*GB, 6, 3, 3, 0, 100*GB, 100, 9*GB, 9, 2, 3*GB, 3, 1);
checkResources(parentQueueSource, 6*GB, 3, 3, 0, 100*GB, 9*GB, 2, 3*GB, 1); checkResources(parentQueueSource, 6*GB, 6, 3, 3, 0, 100*GB, 100, 9*GB, 9, 2, 3*GB, 3, 1);
checkResources(userSource, 6*GB, 3, 3, 0, 10*GB, 9*GB, 2, 3*GB, 1); checkResources(userSource, 6*GB, 6, 3, 3, 0, 10*GB, 10, 9*GB, 9, 2, 3*GB, 3, 1);
checkResources(parentUserSource, 6*GB, 3, 3, 0, 10*GB, 9*GB, 2, 3*GB, 1); checkResources(parentUserSource, 6*GB, 6, 3, 3, 0, 10*GB, 10, 9*GB, 9, 2, 3*GB, 3, 1);
metrics.releaseResources(user, 1, Resources.createResource(2*GB)); metrics.releaseResources(user, 1, Resources.createResource(2*GB, 2));
metrics.unreserveResource(user, Resources.createResource(3*GB)); metrics.unreserveResource(user, Resources.createResource(3*GB, 3));
checkResources(queueSource, 4*GB, 2, 3, 1, 100*GB, 9*GB, 2, 0, 0); checkResources(queueSource, 4*GB, 4, 2, 3, 1, 100*GB, 100, 9*GB, 9, 2, 0, 0, 0);
checkResources(parentQueueSource, 4*GB, 2, 3, 1, 100*GB, 9*GB, 2, 0, 0); checkResources(parentQueueSource, 4*GB, 4, 2, 3, 1, 100*GB, 100, 9*GB, 9, 2, 0, 0, 0);
checkResources(userSource, 4*GB, 2, 3, 1, 10*GB, 9*GB, 2, 0, 0); checkResources(userSource, 4*GB, 4, 2, 3, 1, 10*GB, 10, 9*GB, 9, 2, 0, 0, 0);
checkResources(parentUserSource, 4*GB, 2, 3, 1, 10*GB, 9*GB, 2, 0, 0); checkResources(parentUserSource, 4*GB, 4, 2, 3, 1, 10*GB, 10, 9*GB, 9, 2, 0, 0, 0);
metrics.finishApp(app, RMAppAttemptState.FINISHED); metrics.finishApp(app, RMAppAttemptState.FINISHED);
checkApps(queueSource, 1, 0, 0, 1, 0, 0); checkApps(queueSource, 1, 0, 0, 1, 0, 0);
@ -277,18 +277,23 @@ public class TestQueueMetrics {
} }
public static void checkResources(MetricsSource source, int allocatedMB, public static void checkResources(MetricsSource source, int allocatedMB,
int allocCtnrs, long aggreAllocCtnrs, long aggreReleasedCtnrs, int allocatedCores, int allocCtnrs, long aggreAllocCtnrs,
int availableMB, int pendingMB, int pendingCtnrs, long aggreReleasedCtnrs, int availableMB, int availableCores, int pendingMB,
int reservedMB, int reservedCtnrs) { int pendingCores, int pendingCtnrs, int reservedMB, int reservedCores,
int reservedCtnrs) {
MetricsRecordBuilder rb = getMetrics(source); MetricsRecordBuilder rb = getMetrics(source);
assertGauge("AllocatedMB", allocatedMB, rb); assertGauge("AllocatedMB", allocatedMB, rb);
assertGauge("AllocatedVCores", allocatedCores, rb);
assertGauge("AllocatedContainers", allocCtnrs, rb); assertGauge("AllocatedContainers", allocCtnrs, rb);
assertCounter("AggregateContainersAllocated", aggreAllocCtnrs, rb); assertCounter("AggregateContainersAllocated", aggreAllocCtnrs, rb);
assertCounter("AggregateContainersReleased", aggreReleasedCtnrs, rb); assertCounter("AggregateContainersReleased", aggreReleasedCtnrs, rb);
assertGauge("AvailableMB", availableMB, rb); assertGauge("AvailableMB", availableMB, rb);
assertGauge("AvailableVCores", availableCores, rb);
assertGauge("PendingMB", pendingMB, rb); assertGauge("PendingMB", pendingMB, rb);
assertGauge("PendingVCores", pendingCores, rb);
assertGauge("PendingContainers", pendingCtnrs, rb); assertGauge("PendingContainers", pendingCtnrs, rb);
assertGauge("ReservedMB", reservedMB, rb); assertGauge("ReservedMB", reservedMB, rb);
assertGauge("ReservedVCores", reservedCores, rb);
assertGauge("ReservedContainers", reservedCtnrs, rb); assertGauge("ReservedContainers", reservedCtnrs, rb);
} }