diff --git a/hadoop-yarn-project/CHANGES.txt b/hadoop-yarn-project/CHANGES.txt index 8e6aef4730f..11d34ca32c9 100644 --- a/hadoop-yarn-project/CHANGES.txt +++ b/hadoop-yarn-project/CHANGES.txt @@ -316,6 +316,9 @@ Release 2.4.0 - UNRELEASED YARN-1574. RMDispatcher should be reset on transition to standby. (Xuan Gong via kasha) + YARN-1166. Fixed app-specific and attempt-specific QueueMetrics to be + triggered by accordingly app event and attempt event. + Release 2.3.0 - UNRELEASED INCOMPATIBLE CHANGES diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AppSchedulingInfo.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AppSchedulingInfo.java index 4ccd53a9561..7ac1c0a6714 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AppSchedulingInfo.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AppSchedulingInfo.java @@ -260,7 +260,7 @@ public class AppSchedulingInfo { // once an allocation is done we assume the application is // running from scheduler's POV. pending = false; - metrics.incrAppsRunning(this, user); + metrics.runAppAttempt(applicationId, user); } LOG.debug("allocate: user: " + user + ", memory: " + request.getCapability()); @@ -390,7 +390,7 @@ public class AppSchedulingInfo { .getNumContainers())); } } - metrics.finishApp(this, rmAppAttemptFinalState); + metrics.finishAppAttempt(applicationId, pending, user); // Clear requests themselves clearRequests(); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/QueueMetrics.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/QueueMetrics.java index 8a030952504..485157ed4fa 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/QueueMetrics.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/QueueMetrics.java @@ -41,7 +41,7 @@ import org.apache.hadoop.metrics2.lib.MutableGaugeInt; import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.conf.YarnConfiguration; -import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptState; +import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppState; import org.apache.hadoop.yarn.server.utils.BuilderUtils; import org.apache.hadoop.yarn.util.resource.Resources; import org.slf4j.Logger; @@ -57,7 +57,7 @@ public class QueueMetrics implements MetricsSource { @Metric("# of pending apps") MutableGaugeInt appsPending; @Metric("# of apps completed") MutableCounterInt appsCompleted; @Metric("# of apps killed") MutableCounterInt appsKilled; - @Metric("# of apps failed") MutableGaugeInt appsFailed; + @Metric("# of apps failed") MutableCounterInt appsFailed; @Metric("Allocated memory in MB") MutableGaugeInt allocatedMB; @Metric("Allocated CPU in virtual cores") MutableGaugeInt allocatedVCores; @@ -214,54 +214,70 @@ public class QueueMetrics implements MetricsSource { registry.snapshot(collector.addRecord(registry.info()), all); } - public void submitApp(String user, int attemptId) { - if (attemptId == 1) { - appsSubmitted.incr(); - } else { - appsFailed.decr(); - } - appsPending.incr(); + public void submitApp(String user) { + appsSubmitted.incr(); QueueMetrics userMetrics = getUserMetrics(user); if (userMetrics != null) { - userMetrics.submitApp(user, attemptId); + userMetrics.submitApp(user); } if (parent != null) { - parent.submitApp(user, attemptId); + parent.submitApp(user); } } - public void incrAppsRunning(AppSchedulingInfo app, String user) { - runBuckets.add(app.getApplicationId(), System.currentTimeMillis()); + public void submitAppAttempt(String user) { + appsPending.incr(); + QueueMetrics userMetrics = getUserMetrics(user); + if (userMetrics != null) { + userMetrics.submitAppAttempt(user); + } + if (parent != null) { + parent.submitAppAttempt(user); + } + } + + public void runAppAttempt(ApplicationId appId, String user) { + runBuckets.add(appId, System.currentTimeMillis()); appsRunning.incr(); appsPending.decr(); QueueMetrics userMetrics = getUserMetrics(user); if (userMetrics != null) { - userMetrics.incrAppsRunning(app, user); + userMetrics.runAppAttempt(appId, user); } if (parent != null) { - parent.incrAppsRunning(app, user); + parent.runAppAttempt(appId, user); } } - public void finishApp(AppSchedulingInfo app, - RMAppAttemptState rmAppAttemptFinalState) { - runBuckets.remove(app.getApplicationId()); - switch (rmAppAttemptFinalState) { - case KILLED: appsKilled.incr(); break; - case FAILED: appsFailed.incr(); break; - default: appsCompleted.incr(); break; - } - if (app.isPending()) { + public void finishAppAttempt( + ApplicationId appId, boolean isPending, String user) { + runBuckets.remove(appId); + if (isPending) { appsPending.decr(); } else { appsRunning.decr(); } - QueueMetrics userMetrics = getUserMetrics(app.getUser()); + QueueMetrics userMetrics = getUserMetrics(user); if (userMetrics != null) { - userMetrics.finishApp(app, rmAppAttemptFinalState); + userMetrics.finishAppAttempt(appId, isPending, user); } if (parent != null) { - parent.finishApp(app, rmAppAttemptFinalState); + parent.finishAppAttempt(appId, isPending, user); + } + } + + public void finishApp(String user, RMAppState rmAppFinalState) { + switch (rmAppFinalState) { + case KILLED: appsKilled.incr(); break; + case FAILED: appsFailed.incr(); break; + default: appsCompleted.incr(); break; + } + QueueMetrics userMetrics = getUserMetrics(user); + if (userMetrics != null) { + userMetrics.finishApp(user, rmAppFinalState); + } + if (parent != null) { + parent.finishApp(user, rmAppFinalState); } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerApplication.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerApplication.java index eb61785fb7e..1c4a5a638c5 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerApplication.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerApplication.java @@ -19,6 +19,7 @@ package org.apache.hadoop.yarn.server.resourcemanager.scheduler; import org.apache.hadoop.classification.InterfaceAudience.Private; import org.apache.hadoop.classification.InterfaceStability.Unstable; +import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppState; @Private @Unstable @@ -48,4 +49,9 @@ public class SchedulerApplication { public void setCurrentAppAttempt(SchedulerApplicationAttempt currentAttempt) { this.currentAttempt = currentAttempt; } + + public void stop(RMAppState rmAppFinalState) { + queue.getMetrics().finishApp(user, rmAppFinalState); + } + } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java index 4616916a894..6035ee1bc7e 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java @@ -491,6 +491,7 @@ public class CapacityScheduler if (application == null){ // The AppRemovedSchedulerEvent maybe sent on recovery for completed apps, // ignore it. + LOG.warn("Couldn't find application " + applicationId); return; } CSQueue queue = (CSQueue) application.getQueue(); @@ -500,6 +501,7 @@ public class CapacityScheduler } else { queue.finishApplication(applicationId, application.getUser()); } + application.stop(finalState); applications.remove(applicationId); } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java index a8581a0a8d3..9bc80bc2dab 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java @@ -644,8 +644,7 @@ public class LeafQueue implements CSQueue { addApplicationAttempt(application, user); } - int attemptId = application.getApplicationAttemptId().getAttemptId(); - metrics.submitApp(userName, attemptId); + metrics.submitAppAttempt(userName); getParent().submitApplicationAttempt(application, userName); } @@ -702,6 +701,8 @@ public class LeafQueue implements CSQueue { getParent().getQueuePath(), ace); throw ace; } + + metrics.submitApp(userName); } private synchronized void activateApplications() { diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java index a009f81f2a5..9fc43299681 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java @@ -643,6 +643,7 @@ public class FairScheduler implements ResourceScheduler { SchedulerApplication application = new SchedulerApplication(queue, user); applications.put(applicationId, application); + queue.getMetrics().submitApp(user); LOG.info("Accepted application " + applicationId + " from user: " + user + ", in queue: " + queueName + ", currently num of applications: " @@ -680,7 +681,7 @@ public class FairScheduler implements ResourceScheduler { maxRunningEnforcer.trackNonRunnableApp(attempt); } - queue.getMetrics().submitApp(user, applicationAttemptId.getAttemptId()); + queue.getMetrics().submitAppAttempt(user); LOG.info("Added Application Attempt " + applicationAttemptId + " to scheduler from user: " + user); @@ -714,6 +715,12 @@ public class FairScheduler implements ResourceScheduler { private synchronized void removeApplication(ApplicationId applicationId, RMAppState finalState) { + SchedulerApplication application = applications.get(applicationId); + if (application == null){ + LOG.warn("Couldn't find application " + applicationId); + return; + } + application.stop(finalState); applications.remove(applicationId); } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/FifoScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/FifoScheduler.java index 78e03bdcdd8..d88d1e26a13 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/FifoScheduler.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/FifoScheduler.java @@ -363,8 +363,9 @@ public class FifoScheduler implements ResourceScheduler, Configurable { private synchronized void addApplication(ApplicationId applicationId, String queue, String user) { SchedulerApplication application = - new SchedulerApplication(null, user); + new SchedulerApplication(DEFAULT_QUEUE, user); applications.put(applicationId, application); + metrics.submitApp(user); LOG.info("Accepted application " + applicationId + " from user: " + user + ", currently num of applications: " + applications.size()); rmContext.getDispatcher().getEventHandler() @@ -388,7 +389,7 @@ public class FifoScheduler implements ResourceScheduler, Configurable { } application.setCurrentAppAttempt(schedulerApp); - metrics.submitApp(user, appAttemptId.getAttemptId()); + metrics.submitAppAttempt(user); LOG.info("Added Application Attempt " + appAttemptId + " to scheduler from user " + application.getUser()); rmContext.getDispatcher().getEventHandler().handle( @@ -399,10 +400,15 @@ public class FifoScheduler implements ResourceScheduler, Configurable { private synchronized void doneApplication(ApplicationId applicationId, RMAppState finalState) { SchedulerApplication application = applications.get(applicationId); + if (application == null){ + LOG.warn("Couldn't find application " + applicationId); + return; + } // Inform the activeUsersManager activeUsersManager.deactivateApplication(application.getUser(), applicationId); + application.stop(finalState); applications.remove(applicationId); } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMRestart.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMRestart.java index 440bddc510c..d50f0d7c5f7 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMRestart.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMRestart.java @@ -84,6 +84,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppState; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptState; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.QueueMetrics; import org.apache.hadoop.yarn.server.utils.BuilderUtils; import org.apache.hadoop.yarn.util.ConverterUtils; import org.apache.log4j.Level; @@ -179,7 +180,7 @@ public class TestRMRestart { am1.registerAppAttempt(); // AM request for containers - am1.allocate("127.0.0.1" , 1000, 1, new ArrayList()); + am1.allocate("127.0.0.1" , 1000, 1, new ArrayList()); // kick the scheduler nm1.nodeHeartbeat(true); List conts = am1.allocate(new ArrayList(), @@ -1543,6 +1544,128 @@ public class TestRMRestart { Assert.assertEquals(2, ((TestMemoryRMStateStore) memStore).updateApp); } + @SuppressWarnings("resource") + @Test + public void testQueueMetricsOnRMRestart() throws Exception { + conf.setInt(YarnConfiguration.RM_AM_MAX_ATTEMPTS, + YarnConfiguration.DEFAULT_RM_AM_MAX_ATTEMPTS); + MemoryRMStateStore memStore = new MemoryRMStateStore(); + memStore.init(conf); + + // PHASE 1: create state in an RM + // start RM + MockRM rm1 = new MockRM(conf, memStore); + rm1.start(); + MockNM nm1 = + new MockNM("127.0.0.1:1234", 15120, rm1.getResourceTrackerService()); + nm1.registerNode(); + QueueMetrics qm1 = rm1.getResourceScheduler().getRootQueueMetrics(); + resetQueueMetrics(qm1); + assertQueueMetrics(qm1, 0, 0, 0, 0); + + // create app that gets launched and does allocate before RM restart + RMApp app1 = rm1.submitApp(200); + assertQueueMetrics(qm1, 1, 1, 0, 0); + nm1.nodeHeartbeat(true); + RMAppAttempt attempt1 = app1.getCurrentAppAttempt(); + ApplicationAttemptId attemptId1 = attempt1.getAppAttemptId(); + rm1.waitForState(attemptId1, RMAppAttemptState.ALLOCATED); + MockAM am1 = rm1.sendAMLaunched(attempt1.getAppAttemptId()); + am1.registerAppAttempt(); + am1.allocate("127.0.0.1" , 1000, 1, new ArrayList()); + nm1.nodeHeartbeat(true); + List conts = am1.allocate(new ArrayList(), + new ArrayList()).getAllocatedContainers(); + while (conts.size() == 0) { + nm1.nodeHeartbeat(true); + conts.addAll(am1.allocate(new ArrayList(), + new ArrayList()).getAllocatedContainers()); + Thread.sleep(500); + } + assertQueueMetrics(qm1, 1, 0, 1, 0); + + // PHASE 2: create new RM and start from old state + // create new RM to represent restart and recover state + MockRM rm2 = new MockRM(conf, memStore); + rm2.start(); + nm1.setResourceTrackerService(rm2.getResourceTrackerService()); + QueueMetrics qm2 = rm2.getResourceScheduler().getRootQueueMetrics(); + resetQueueMetrics(qm2); + assertQueueMetrics(qm2, 0, 0, 0, 0); + // recover app + RMApp loadedApp1 = rm2.getRMContext().getRMApps().get(app1.getApplicationId()); + am1.setAMRMProtocol(rm2.getApplicationMasterService()); + am1.allocate(new ArrayList(), new ArrayList()); + nm1.nodeHeartbeat(true); + nm1 = new MockNM("127.0.0.1:1234", 15120, rm2.getResourceTrackerService()); + List containerStatuses = new ArrayList(); + ContainerStatus containerStatus = + BuilderUtils.newContainerStatus(BuilderUtils.newContainerId(loadedApp1 + .getCurrentAppAttempt().getAppAttemptId(), 1), + ContainerState.COMPLETE, "Killed AM container", 143); + containerStatuses.add(containerStatus); + nm1.registerNode(containerStatuses); + int timeoutSecs = 0; + while (loadedApp1.getAppAttempts().size() != 2 && timeoutSecs++ < 40) {; + Thread.sleep(200); + } + + assertQueueMetrics(qm2, 1, 1, 0, 0); + nm1.nodeHeartbeat(true); + attempt1 = loadedApp1.getCurrentAppAttempt(); + attemptId1 = attempt1.getAppAttemptId(); + rm2.waitForState(attemptId1, RMAppAttemptState.ALLOCATED); + assertQueueMetrics(qm2, 1, 0, 1, 0); + am1 = rm2.sendAMLaunched(attempt1.getAppAttemptId()); + am1.registerAppAttempt(); + am1.allocate("127.0.0.1" , 1000, 3, new ArrayList()); + nm1.nodeHeartbeat(true); + conts = am1.allocate(new ArrayList(), + new ArrayList()).getAllocatedContainers(); + while (conts.size() == 0) { + nm1.nodeHeartbeat(true); + conts.addAll(am1.allocate(new ArrayList(), + new ArrayList()).getAllocatedContainers()); + Thread.sleep(500); + } + + // finish the AMs + finishApplicationMaster(loadedApp1, rm2, nm1, am1); + assertQueueMetrics(qm2, 1, 0, 0, 1); + + // stop RM's + rm2.stop(); + rm1.stop(); + } + + + // The metrics has some carry-on value from the previous RM, because the + // test case is in-memory, for the same queue name (e.g. root), there's + // always a singleton QueueMetrics object. + private int appsSubmittedCarryOn = 0; + private int appsPendingCarryOn = 0; + private int appsRunningCarryOn = 0; + private int appsCompletedCarryOn = 0; + + private void resetQueueMetrics(QueueMetrics qm) { + appsSubmittedCarryOn = qm.getAppsSubmitted(); + appsPendingCarryOn = qm.getAppsPending(); + appsRunningCarryOn = qm.getAppsRunning(); + appsCompletedCarryOn = qm.getAppsCompleted(); + } + + private void assertQueueMetrics(QueueMetrics qm, int appsSubmitted, + int appsPending, int appsRunning, int appsCompleted) { + Assert.assertEquals(qm.getAppsSubmitted(), + appsSubmitted + appsSubmittedCarryOn); + Assert.assertEquals(qm.getAppsPending(), + appsPending + appsPendingCarryOn); + Assert.assertEquals(qm.getAppsRunning(), + appsRunning + appsRunningCarryOn); + Assert.assertEquals(qm.getAppsCompleted(), + appsCompleted + appsCompletedCarryOn); + } + public class TestMemoryRMStateStore extends MemoryRMStateStore { int count = 0; public int updateApp = 0; diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/TestQueueMetrics.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/TestQueueMetrics.java index b3702ad8437..d0a8f7235fd 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/TestQueueMetrics.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/TestQueueMetrics.java @@ -37,7 +37,7 @@ import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.server.resourcemanager.MockRM; -import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptState; +import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppState; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fifo.FifoScheduler; import org.apache.hadoop.yarn.server.utils.BuilderUtils; import org.apache.hadoop.yarn.util.resource.Resources; @@ -66,8 +66,10 @@ public class TestQueueMetrics { MetricsSource queueSource= queueSource(ms, queueName); AppSchedulingInfo app = mockApp(user); - metrics.submitApp(user, 1); + metrics.submitApp(user); MetricsSource userSource = userSource(ms, queueName, user); + checkApps(queueSource, 1, 0, 0, 0, 0, 0, true); + metrics.submitAppAttempt(user); checkApps(queueSource, 1, 1, 0, 0, 0, 0, true); metrics.setAvailableResourcesToQueue(Resources.createResource(100*GB, 100)); @@ -76,7 +78,7 @@ public class TestQueueMetrics { // configurable cluster/queue resources checkResources(queueSource, 0, 0, 0, 0, 0, 100*GB, 100, 15*GB, 15, 5, 0, 0, 0); - metrics.incrAppsRunning(app, user); + metrics.runAppAttempt(app.getApplicationId(), user); checkApps(queueSource, 1, 0, 1, 0, 0, 0, true); metrics.allocateResources(user, 3, Resources.createResource(2*GB, 2)); @@ -85,7 +87,10 @@ public class TestQueueMetrics { metrics.releaseResources(user, 1, Resources.createResource(2*GB, 2)); checkResources(queueSource, 4*GB, 4, 2, 3, 1, 100*GB, 100, 9*GB, 9, 2, 0, 0, 0); - metrics.finishApp(app, RMAppAttemptState.FINISHED); + metrics.finishAppAttempt( + app.getApplicationId(), app.isPending(), app.getUser()); + checkApps(queueSource, 1, 0, 0, 0, 0, 0, true); + metrics.finishApp(user, RMAppState.FINISHED); checkApps(queueSource, 1, 0, 0, 1, 0, 0, true); assertNull(userSource); } @@ -100,39 +105,47 @@ public class TestQueueMetrics { MetricsSource queueSource = queueSource(ms, queueName); AppSchedulingInfo app = mockApp(user); - metrics.submitApp(user, 1); + metrics.submitApp(user); MetricsSource userSource = userSource(ms, queueName, user); + checkApps(queueSource, 1, 0, 0, 0, 0, 0, true); + metrics.submitAppAttempt(user); checkApps(queueSource, 1, 1, 0, 0, 0, 0, true); - metrics.incrAppsRunning(app, user); + metrics.runAppAttempt(app.getApplicationId(), user); checkApps(queueSource, 1, 0, 1, 0, 0, 0, true); - metrics.finishApp(app, RMAppAttemptState.FAILED); - checkApps(queueSource, 1, 0, 0, 0, 1, 0, true); + metrics.finishAppAttempt( + app.getApplicationId(), app.isPending(), app.getUser()); + checkApps(queueSource, 1, 0, 0, 0, 0, 0, true); // As the application has failed, framework retries the same application // based on configuration - metrics.submitApp(user, 2); + metrics.submitAppAttempt(user); checkApps(queueSource, 1, 1, 0, 0, 0, 0, true); - metrics.incrAppsRunning(app, user); + metrics.runAppAttempt(app.getApplicationId(), user); checkApps(queueSource, 1, 0, 1, 0, 0, 0, true); // Suppose say application has failed this time as well. - metrics.finishApp(app, RMAppAttemptState.FAILED); - checkApps(queueSource, 1, 0, 0, 0, 1, 0, true); + metrics.finishAppAttempt( + app.getApplicationId(), app.isPending(), app.getUser()); + checkApps(queueSource, 1, 0, 0, 0, 0, 0, true); // As the application has failed, framework retries the same application // based on configuration - metrics.submitApp(user, 3); + metrics.submitAppAttempt(user); checkApps(queueSource, 1, 1, 0, 0, 0, 0, true); - metrics.incrAppsRunning(app, user); + metrics.runAppAttempt(app.getApplicationId(), user); checkApps(queueSource, 1, 0, 1, 0, 0, 0, true); - // Suppose say application has finished. - metrics.finishApp(app, RMAppAttemptState.FINISHED); - checkApps(queueSource, 1, 0, 0, 1, 0, 0, true); + // Suppose say application has failed, and there's no more retries. + metrics.finishAppAttempt( + app.getApplicationId(), app.isPending(), app.getUser()); + checkApps(queueSource, 1, 0, 0, 0, 0, 0, true); + + metrics.finishApp(user, RMAppState.FAILED); + checkApps(queueSource, 1, 0, 0, 0, 1, 0, true); assertNull(userSource); } @@ -146,9 +159,13 @@ public class TestQueueMetrics { MetricsSource queueSource = queueSource(ms, queueName); AppSchedulingInfo app = mockApp(user); - metrics.submitApp(user, 1); + metrics.submitApp(user); MetricsSource userSource = userSource(ms, queueName, user); + checkApps(queueSource, 1, 0, 0, 0, 0, 0, true); + checkApps(userSource, 1, 0, 0, 0, 0, 0, true); + + metrics.submitAppAttempt(user); checkApps(queueSource, 1, 1, 0, 0, 0, 0, true); checkApps(userSource, 1, 1, 0, 0, 0, 0, true); @@ -160,7 +177,7 @@ public class TestQueueMetrics { checkResources(queueSource, 0, 0, 0, 0, 0, 100*GB, 100, 15*GB, 15, 5, 0, 0, 0); checkResources(userSource, 0, 0, 0, 0, 0, 10*GB, 10, 15*GB, 15, 5, 0, 0, 0); - metrics.incrAppsRunning(app, user); + metrics.runAppAttempt(app.getApplicationId(), user); checkApps(queueSource, 1, 0, 1, 0, 0, 0, true); checkApps(userSource, 1, 0, 1, 0, 0, 0, true); @@ -172,7 +189,11 @@ public class TestQueueMetrics { checkResources(queueSource, 4*GB, 4, 2, 3, 1, 100*GB, 100, 9*GB, 9, 2, 0, 0, 0); checkResources(userSource, 4*GB, 4, 2, 3, 1, 10*GB, 10, 9*GB, 9, 2, 0, 0, 0); - metrics.finishApp(app, RMAppAttemptState.FINISHED); + metrics.finishAppAttempt( + app.getApplicationId(), app.isPending(), app.getUser()); + checkApps(queueSource, 1, 0, 0, 0, 0, 0, true); + checkApps(userSource, 1, 0, 0, 0, 0, 0, true); + metrics.finishApp(user, RMAppState.FINISHED); checkApps(queueSource, 1, 0, 0, 1, 0, 0, true); checkApps(userSource, 1, 0, 0, 1, 0, 0, true); } @@ -192,10 +213,16 @@ public class TestQueueMetrics { MetricsSource queueSource = queueSource(ms, leafQueueName); AppSchedulingInfo app = mockApp(user); - metrics.submitApp(user, 1); + metrics.submitApp(user); MetricsSource userSource = userSource(ms, leafQueueName, user); MetricsSource parentUserSource = userSource(ms, parentQueueName, user); + checkApps(queueSource, 1, 0, 0, 0, 0, 0, true); + checkApps(parentQueueSource, 1, 0, 0, 0, 0, 0, true); + checkApps(userSource, 1, 0, 0, 0, 0, 0, true); + checkApps(parentUserSource, 1, 0, 0, 0, 0, 0, true); + + metrics.submitAppAttempt(user); checkApps(queueSource, 1, 1, 0, 0, 0, 0, true); checkApps(parentQueueSource, 1, 1, 0, 0, 0, 0, true); checkApps(userSource, 1, 1, 0, 0, 0, 0, true); @@ -211,7 +238,7 @@ public class TestQueueMetrics { checkResources(userSource, 0, 0, 0, 0, 0, 10*GB, 10, 15*GB, 15, 5, 0, 0, 0); checkResources(parentUserSource, 0, 0, 0, 0, 0, 10*GB, 10, 15*GB, 15, 5, 0, 0, 0); - metrics.incrAppsRunning(app, user); + metrics.runAppAttempt(app.getApplicationId(), user); checkApps(queueSource, 1, 0, 1, 0, 0, 0, true); checkApps(userSource, 1, 0, 1, 0, 0, 0, true); @@ -231,7 +258,14 @@ public class TestQueueMetrics { checkResources(userSource, 4*GB, 4, 2, 3, 1, 10*GB, 10, 9*GB, 9, 2, 0, 0, 0); checkResources(parentUserSource, 4*GB, 4, 2, 3, 1, 10*GB, 10, 9*GB, 9, 2, 0, 0, 0); - metrics.finishApp(app, RMAppAttemptState.FINISHED); + metrics.finishAppAttempt( + app.getApplicationId(), app.isPending(), app.getUser()); + checkApps(queueSource, 1, 0, 0, 0, 0, 0, true); + checkApps(parentQueueSource, 1, 0, 0, 0, 0, 0, true); + checkApps(userSource, 1, 0, 0, 0, 0, 0, true); + checkApps(parentUserSource, 1, 0, 0, 0, 0, 0, true); + + metrics.finishApp(user, RMAppState.FINISHED); checkApps(queueSource, 1, 0, 0, 1, 0, 0, true); checkApps(parentQueueSource, 1, 0, 0, 1, 0, 0, true); checkApps(userSource, 1, 0, 0, 1, 0, 0, true); @@ -308,7 +342,7 @@ public class TestQueueMetrics { assertGauge("AppsPending", pending, rb); assertGauge("AppsRunning", running, rb); assertCounter("AppsCompleted", completed, rb); - assertGauge("AppsFailed", failed, rb); + assertCounter("AppsFailed", failed, rb); assertCounter("AppsKilled", killed, rb); } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestLeafQueue.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestLeafQueue.java index dde1ff4ea0c..5d91e8f30bd 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestLeafQueue.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestLeafQueue.java @@ -55,6 +55,7 @@ import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.factories.RecordFactory; import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider; import org.apache.hadoop.yarn.server.resourcemanager.RMContext; +import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppState; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptState; import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer; import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerEventType; @@ -66,6 +67,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaS import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppAddedSchedulerEvent; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppAttemptAddedSchedulerEvent; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppAttemptRemovedSchedulerEvent; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppRemovedSchedulerEvent; import org.apache.hadoop.yarn.server.resourcemanager.security.RMContainerTokenSecretManager; import org.apache.hadoop.yarn.util.resource.DefaultResourceCalculator; import org.apache.hadoop.yarn.util.resource.ResourceCalculator; @@ -360,7 +362,7 @@ public class TestLeafQueue { cs.handle(event); assertEquals(0, a.getMetrics().getAppsPending()); - assertEquals(1, a.getMetrics().getAppsFailed()); + assertEquals(0, a.getMetrics().getAppsFailed()); // Attempt the same application again final ApplicationAttemptId appAttemptId_1 = TestUtils @@ -375,6 +377,9 @@ public class TestLeafQueue { event = new AppAttemptRemovedSchedulerEvent(appAttemptId_0, RMAppAttemptState.FINISHED, false); cs.handle(event); + AppRemovedSchedulerEvent rEvent = new AppRemovedSchedulerEvent( + appAttemptId_0.getApplicationId(), RMAppState.FINISHED); + cs.handle(rEvent); assertEquals(1, a.getMetrics().getAppsSubmitted()); assertEquals(0, a.getMetrics().getAppsPending()); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/TestFifoScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/TestFifoScheduler.java index 2a8c9815960..2dc0e8805e7 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/TestFifoScheduler.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/TestFifoScheduler.java @@ -70,7 +70,6 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppAttemptA import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeAddedSchedulerEvent; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeUpdateSchedulerEvent; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.SchedulerEvent; -import org.apache.hadoop.yarn.server.resourcemanager.security.ClientToAMTokenSecretManagerInRM; import org.apache.hadoop.yarn.server.resourcemanager.security.NMTokenSecretManagerInRM; import org.apache.hadoop.yarn.server.resourcemanager.security.RMContainerTokenSecretManager; import org.apache.hadoop.yarn.server.utils.BuilderUtils; @@ -160,11 +159,6 @@ public class TestFifoScheduler { schedular.handle(attemptEvent); appAttemptId = BuilderUtils.newApplicationAttemptId(appId, 2); - - SchedulerEvent appEvent2 = - new AppAddedSchedulerEvent(appAttemptId.getApplicationId(), "queue", - "user"); - schedular.handle(appEvent2); SchedulerEvent attemptEvent2 = new AppAttemptAddedSchedulerEvent(appAttemptId, false); schedular.handle(attemptEvent2);