diff --git a/hadoop-yarn-project/CHANGES.txt b/hadoop-yarn-project/CHANGES.txt index 80d004a7014..fa3a15810aa 100644 --- a/hadoop-yarn-project/CHANGES.txt +++ b/hadoop-yarn-project/CHANGES.txt @@ -238,6 +238,8 @@ Release 2.0.5-beta - UNRELEASED YARN-568. Add support for work preserving preemption to the FairScheduler. (Carlo Curino and Sandy Ryza via cdouglas) + YARN-598. Add virtual cores to queue metrics. (sandyr via tucu) + OPTIMIZATIONS BUG FIXES diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/QueueMetrics.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/QueueMetrics.java index 13def49c20d..4ba36149487 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/QueueMetrics.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/QueueMetrics.java @@ -60,13 +60,17 @@ public class QueueMetrics implements MetricsSource { @Metric("# of apps failed") MutableGaugeInt appsFailed; @Metric("Allocated memory in MB") MutableGaugeInt allocatedMB; + @Metric("Allocated CPU in virtual cores") MutableGaugeInt allocatedVCores; @Metric("# of allocated containers") MutableGaugeInt allocatedContainers; @Metric("Aggregate # of allocated containers") MutableCounterLong aggregateContainersAllocated; @Metric("Aggregate # of released containers") MutableCounterLong aggregateContainersReleased; @Metric("Available memory in MB") MutableGaugeInt availableMB; + @Metric("Available CPU in virtual cores") MutableGaugeInt availableVCores; @Metric("Pending memory allocation in MB") MutableGaugeInt pendingMB; + @Metric("Pending CPU allocation in virtual cores") MutableGaugeInt pendingVCores; @Metric("# of pending containers") MutableGaugeInt pendingContainers; @Metric("# of reserved memory in MB") MutableGaugeInt reservedMB; + @Metric("Reserved CPU in virtual cores") MutableGaugeInt reservedVCores; @Metric("# of reserved containers") MutableGaugeInt reservedContainers; @Metric("# of active users") MutableGaugeInt activeUsers; @Metric("# of active users") MutableGaugeInt activeApplications; @@ -268,6 +272,7 @@ public void finishApp(AppSchedulingInfo app, */ public void setAvailableResourcesToQueue(Resource limit) { availableMB.set(limit.getMemory()); + availableVCores.set(limit.getVirtualCores()); } /** @@ -304,6 +309,7 @@ public void incrPendingResources(String user, int containers, Resource res) { private void _incrPendingResources(int containers, Resource res) { pendingContainers.incr(containers); pendingMB.incr(res.getMemory()); + pendingVCores.incr(res.getVirtualCores()); } public void decrPendingResources(String user, int containers, Resource res) { @@ -320,12 +326,14 @@ public void decrPendingResources(String user, int containers, Resource res) { private void _decrPendingResources(int containers, Resource res) { pendingContainers.decr(containers); pendingMB.decr(res.getMemory()); + pendingVCores.decr(res.getVirtualCores()); } public void allocateResources(String user, int containers, Resource res) { allocatedContainers.incr(containers); aggregateContainersAllocated.incr(containers); allocatedMB.incr(res.getMemory() * containers); + allocatedVCores.incr(res.getVirtualCores() * containers); _decrPendingResources(containers, Resources.multiply(res, containers)); QueueMetrics userMetrics = getUserMetrics(user); if (userMetrics != null) { @@ -340,6 +348,7 @@ public void releaseResources(String user, int containers, Resource res) { allocatedContainers.decr(containers); aggregateContainersReleased.incr(containers); allocatedMB.decr(res.getMemory() * containers); + allocatedVCores.decr(res.getVirtualCores() * containers); QueueMetrics userMetrics = getUserMetrics(user); if (userMetrics != null) { userMetrics.releaseResources(user, containers, res); @@ -352,6 +361,7 @@ public void releaseResources(String user, int containers, Resource res) { public void reserveResource(String user, Resource res) { reservedContainers.incr(); reservedMB.incr(res.getMemory()); + reservedVCores.incr(res.getVirtualCores()); QueueMetrics userMetrics = getUserMetrics(user); if (userMetrics != null) { userMetrics.reserveResource(user, res); @@ -364,6 +374,7 @@ public void reserveResource(String user, Resource res) { public void unreserveResource(String user, Resource res) { reservedContainers.decr(); reservedMB.decr(res.getMemory()); + reservedVCores.decr(res.getVirtualCores()); QueueMetrics userMetrics = getUserMetrics(user); if (userMetrics != null) { userMetrics.unreserveResource(user, res); @@ -434,6 +445,10 @@ public Resource getAllocatedResources() { public int getAllocatedMB() { return allocatedMB.value(); } + + public int getAllocatedVirtualCores() { + return allocatedVCores.value(); + } public int getAllocatedContainers() { return allocatedContainers.value(); @@ -442,10 +457,18 @@ public int getAllocatedContainers() { public int getAvailableMB() { return availableMB.value(); } + + public int getAvailableVirtualCores() { + return availableVCores.value(); + } public int getPendingMB() { return pendingMB.value(); } + + public int getPendingVirtualCores() { + return pendingVCores.value(); + } public int getPendingContainers() { return pendingContainers.value(); @@ -454,6 +477,10 @@ public int getPendingContainers() { public int getReservedMB() { return reservedMB.value(); } + + public int getReservedVirtualCores() { + return reservedVCores.value(); + } public int getReservedContainers() { return reservedContainers.value(); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/TestQueueMetrics.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/TestQueueMetrics.java index 88b6f40cb80..017dadc9f5f 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/TestQueueMetrics.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/TestQueueMetrics.java @@ -66,20 +66,20 @@ public void setUp() { MetricsSource userSource = userSource(ms, queueName, user); checkApps(queueSource, 1, 1, 0, 0, 0, 0); - metrics.setAvailableResourcesToQueue(Resources.createResource(100*GB)); - metrics.incrPendingResources(user, 5, Resources.createResource(15*GB)); + metrics.setAvailableResourcesToQueue(Resources.createResource(100*GB, 100)); + metrics.incrPendingResources(user, 5, Resources.createResource(15*GB, 15)); // Available resources is set externally, as it depends on dynamic // configurable cluster/queue resources - checkResources(queueSource, 0, 0, 0, 0, 100*GB, 15*GB, 5, 0, 0); + checkResources(queueSource, 0, 0, 0, 0, 0, 100*GB, 100, 15*GB, 15, 5, 0, 0, 0); metrics.incrAppsRunning(app, user); checkApps(queueSource, 1, 0, 1, 0, 0, 0); - metrics.allocateResources(user, 3, Resources.createResource(2*GB)); - checkResources(queueSource, 6*GB, 3, 3, 0, 100*GB, 9*GB, 2, 0, 0); + metrics.allocateResources(user, 3, Resources.createResource(2*GB, 2)); + checkResources(queueSource, 6*GB, 6, 3, 3, 0, 100*GB, 100, 9*GB, 9, 2, 0, 0, 0); - metrics.releaseResources(user, 1, Resources.createResource(2*GB)); - checkResources(queueSource, 4*GB, 2, 3, 1, 100*GB, 9*GB, 2, 0, 0); + metrics.releaseResources(user, 1, Resources.createResource(2*GB, 2)); + checkResources(queueSource, 4*GB, 4, 2, 3, 1, 100*GB, 100, 9*GB, 9, 2, 0, 0, 0); metrics.finishApp(app, RMAppAttemptState.FINISHED); checkApps(queueSource, 1, 0, 0, 1, 0, 0); @@ -148,25 +148,25 @@ public void testQueueAppMetricsForMultipleFailures() { checkApps(queueSource, 1, 1, 0, 0, 0, 0); checkApps(userSource, 1, 1, 0, 0, 0, 0); - metrics.setAvailableResourcesToQueue(Resources.createResource(100*GB)); - metrics.setAvailableResourcesToUser(user, Resources.createResource(10*GB)); - metrics.incrPendingResources(user, 5, Resources.createResource(15*GB)); + metrics.setAvailableResourcesToQueue(Resources.createResource(100*GB, 100)); + metrics.setAvailableResourcesToUser(user, Resources.createResource(10*GB, 10)); + metrics.incrPendingResources(user, 5, Resources.createResource(15*GB, 15)); // Available resources is set externally, as it depends on dynamic // configurable cluster/queue resources - checkResources(queueSource, 0, 0, 0, 0, 100*GB, 15*GB, 5, 0, 0); - checkResources(userSource, 0, 0, 0, 0, 10*GB, 15*GB, 5, 0, 0); + checkResources(queueSource, 0, 0, 0, 0, 0, 100*GB, 100, 15*GB, 15, 5, 0, 0, 0); + checkResources(userSource, 0, 0, 0, 0, 0, 10*GB, 10, 15*GB, 15, 5, 0, 0, 0); metrics.incrAppsRunning(app, user); checkApps(queueSource, 1, 0, 1, 0, 0, 0); checkApps(userSource, 1, 0, 1, 0, 0, 0); - metrics.allocateResources(user, 3, Resources.createResource(2*GB)); - checkResources(queueSource, 6*GB, 3, 3, 0, 100*GB, 9*GB, 2, 0, 0); - checkResources(userSource, 6*GB, 3, 3, 0, 10*GB, 9*GB, 2, 0, 0); + metrics.allocateResources(user, 3, Resources.createResource(2*GB, 2)); + checkResources(queueSource, 6*GB, 6, 3, 3, 0, 100*GB, 100, 9*GB, 9, 2, 0, 0, 0); + checkResources(userSource, 6*GB, 6, 3, 3, 0, 10*GB, 10, 9*GB, 9, 2, 0, 0, 0); - metrics.releaseResources(user, 1, Resources.createResource(2*GB)); - checkResources(queueSource, 4*GB, 2, 3, 1, 100*GB, 9*GB, 2, 0, 0); - checkResources(userSource, 4*GB, 2, 3, 1, 10*GB, 9*GB, 2, 0, 0); + metrics.releaseResources(user, 1, Resources.createResource(2*GB, 2)); + checkResources(queueSource, 4*GB, 4, 2, 3, 1, 100*GB, 100, 9*GB, 9, 2, 0, 0, 0); + checkResources(userSource, 4*GB, 4, 2, 3, 1, 10*GB, 10, 9*GB, 9, 2, 0, 0, 0); metrics.finishApp(app, RMAppAttemptState.FINISHED); checkApps(queueSource, 1, 0, 0, 1, 0, 0); @@ -197,35 +197,35 @@ public void testQueueAppMetricsForMultipleFailures() { checkApps(userSource, 1, 1, 0, 0, 0, 0); checkApps(parentUserSource, 1, 1, 0, 0, 0, 0); - parentMetrics.setAvailableResourcesToQueue(Resources.createResource(100*GB)); - metrics.setAvailableResourcesToQueue(Resources.createResource(100*GB)); - parentMetrics.setAvailableResourcesToUser(user, Resources.createResource(10*GB)); - metrics.setAvailableResourcesToUser(user, Resources.createResource(10*GB)); - metrics.incrPendingResources(user, 5, Resources.createResource(15*GB)); - checkResources(queueSource, 0, 0, 0, 0, 100*GB, 15*GB, 5, 0, 0); - checkResources(parentQueueSource, 0, 0, 0, 0, 100*GB, 15*GB, 5, 0, 0); - checkResources(userSource, 0, 0, 0, 0, 10*GB, 15*GB, 5, 0, 0); - checkResources(parentUserSource, 0, 0, 0, 0, 10*GB, 15*GB, 5, 0, 0); + parentMetrics.setAvailableResourcesToQueue(Resources.createResource(100*GB, 100)); + metrics.setAvailableResourcesToQueue(Resources.createResource(100*GB, 100)); + parentMetrics.setAvailableResourcesToUser(user, Resources.createResource(10*GB, 10)); + metrics.setAvailableResourcesToUser(user, Resources.createResource(10*GB, 10)); + metrics.incrPendingResources(user, 5, Resources.createResource(15*GB, 15)); + checkResources(queueSource, 0, 0, 0, 0, 0, 100*GB, 100, 15*GB, 15, 5, 0, 0, 0); + checkResources(parentQueueSource, 0, 0, 0, 0, 0, 100*GB, 100, 15*GB, 15, 5, 0, 0, 0); + checkResources(userSource, 0, 0, 0, 0, 0, 10*GB, 10, 15*GB, 15, 5, 0, 0, 0); + checkResources(parentUserSource, 0, 0, 0, 0, 0, 10*GB, 10, 15*GB, 15, 5, 0, 0, 0); metrics.incrAppsRunning(app, user); checkApps(queueSource, 1, 0, 1, 0, 0, 0); checkApps(userSource, 1, 0, 1, 0, 0, 0); - metrics.allocateResources(user, 3, Resources.createResource(2*GB)); - metrics.reserveResource(user, Resources.createResource(3*GB)); + metrics.allocateResources(user, 3, Resources.createResource(2*GB, 2)); + metrics.reserveResource(user, Resources.createResource(3*GB, 3)); // Available resources is set externally, as it depends on dynamic // configurable cluster/queue resources - checkResources(queueSource, 6*GB, 3, 3, 0, 100*GB, 9*GB, 2, 3*GB, 1); - checkResources(parentQueueSource, 6*GB, 3, 3, 0, 100*GB, 9*GB, 2, 3*GB, 1); - checkResources(userSource, 6*GB, 3, 3, 0, 10*GB, 9*GB, 2, 3*GB, 1); - checkResources(parentUserSource, 6*GB, 3, 3, 0, 10*GB, 9*GB, 2, 3*GB, 1); + checkResources(queueSource, 6*GB, 6, 3, 3, 0, 100*GB, 100, 9*GB, 9, 2, 3*GB, 3, 1); + checkResources(parentQueueSource, 6*GB, 6, 3, 3, 0, 100*GB, 100, 9*GB, 9, 2, 3*GB, 3, 1); + checkResources(userSource, 6*GB, 6, 3, 3, 0, 10*GB, 10, 9*GB, 9, 2, 3*GB, 3, 1); + checkResources(parentUserSource, 6*GB, 6, 3, 3, 0, 10*GB, 10, 9*GB, 9, 2, 3*GB, 3, 1); - metrics.releaseResources(user, 1, Resources.createResource(2*GB)); - metrics.unreserveResource(user, Resources.createResource(3*GB)); - checkResources(queueSource, 4*GB, 2, 3, 1, 100*GB, 9*GB, 2, 0, 0); - checkResources(parentQueueSource, 4*GB, 2, 3, 1, 100*GB, 9*GB, 2, 0, 0); - checkResources(userSource, 4*GB, 2, 3, 1, 10*GB, 9*GB, 2, 0, 0); - checkResources(parentUserSource, 4*GB, 2, 3, 1, 10*GB, 9*GB, 2, 0, 0); + metrics.releaseResources(user, 1, Resources.createResource(2*GB, 2)); + metrics.unreserveResource(user, Resources.createResource(3*GB, 3)); + checkResources(queueSource, 4*GB, 4, 2, 3, 1, 100*GB, 100, 9*GB, 9, 2, 0, 0, 0); + checkResources(parentQueueSource, 4*GB, 4, 2, 3, 1, 100*GB, 100, 9*GB, 9, 2, 0, 0, 0); + checkResources(userSource, 4*GB, 4, 2, 3, 1, 10*GB, 10, 9*GB, 9, 2, 0, 0, 0); + checkResources(parentUserSource, 4*GB, 4, 2, 3, 1, 10*GB, 10, 9*GB, 9, 2, 0, 0, 0); metrics.finishApp(app, RMAppAttemptState.FINISHED); checkApps(queueSource, 1, 0, 0, 1, 0, 0); @@ -277,18 +277,23 @@ public static void checkApps(MetricsSource source, int submitted, int pending, } public static void checkResources(MetricsSource source, int allocatedMB, - int allocCtnrs, long aggreAllocCtnrs, long aggreReleasedCtnrs, - int availableMB, int pendingMB, int pendingCtnrs, - int reservedMB, int reservedCtnrs) { + int allocatedCores, int allocCtnrs, long aggreAllocCtnrs, + long aggreReleasedCtnrs, int availableMB, int availableCores, int pendingMB, + int pendingCores, int pendingCtnrs, int reservedMB, int reservedCores, + int reservedCtnrs) { MetricsRecordBuilder rb = getMetrics(source); assertGauge("AllocatedMB", allocatedMB, rb); + assertGauge("AllocatedVCores", allocatedCores, rb); assertGauge("AllocatedContainers", allocCtnrs, rb); assertCounter("AggregateContainersAllocated", aggreAllocCtnrs, rb); assertCounter("AggregateContainersReleased", aggreReleasedCtnrs, rb); assertGauge("AvailableMB", availableMB, rb); + assertGauge("AvailableVCores", availableCores, rb); assertGauge("PendingMB", pendingMB, rb); + assertGauge("PendingVCores", pendingCores, rb); assertGauge("PendingContainers", pendingCtnrs, rb); assertGauge("ReservedMB", reservedMB, rb); + assertGauge("ReservedVCores", reservedCores, rb); assertGauge("ReservedContainers", reservedCtnrs, rb); }