YARN-1166. Fixed app-specific and attempt-specific QueueMetrics to be triggered by accordingly app event and attempt event. Contributed by Zhijie Shen

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1557296 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Jian He 2014-01-11 01:15:49 +00:00
parent 9d382a4174
commit f677175f35
12 changed files with 263 additions and 66 deletions

View File

@ -316,6 +316,9 @@ Release 2.4.0 - UNRELEASED
YARN-1574. RMDispatcher should be reset on transition to standby. (Xuan Gong YARN-1574. RMDispatcher should be reset on transition to standby. (Xuan Gong
via kasha) via kasha)
YARN-1166. Fixed app-specific and attempt-specific QueueMetrics to be
triggered by accordingly app event and attempt event.
Release 2.3.0 - UNRELEASED Release 2.3.0 - UNRELEASED
INCOMPATIBLE CHANGES INCOMPATIBLE CHANGES

View File

@ -260,7 +260,7 @@ public class AppSchedulingInfo {
// once an allocation is done we assume the application is // once an allocation is done we assume the application is
// running from scheduler's POV. // running from scheduler's POV.
pending = false; pending = false;
metrics.incrAppsRunning(this, user); metrics.runAppAttempt(applicationId, user);
} }
LOG.debug("allocate: user: " + user + ", memory: " LOG.debug("allocate: user: " + user + ", memory: "
+ request.getCapability()); + request.getCapability());
@ -390,7 +390,7 @@ public class AppSchedulingInfo {
.getNumContainers())); .getNumContainers()));
} }
} }
metrics.finishApp(this, rmAppAttemptFinalState); metrics.finishAppAttempt(applicationId, pending, user);
// Clear requests themselves // Clear requests themselves
clearRequests(); clearRequests();

View File

@ -41,7 +41,7 @@ import org.apache.hadoop.metrics2.lib.MutableGaugeInt;
import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptState; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppState;
import org.apache.hadoop.yarn.server.utils.BuilderUtils; import org.apache.hadoop.yarn.server.utils.BuilderUtils;
import org.apache.hadoop.yarn.util.resource.Resources; import org.apache.hadoop.yarn.util.resource.Resources;
import org.slf4j.Logger; import org.slf4j.Logger;
@ -57,7 +57,7 @@ public class QueueMetrics implements MetricsSource {
@Metric("# of pending apps") MutableGaugeInt appsPending; @Metric("# of pending apps") MutableGaugeInt appsPending;
@Metric("# of apps completed") MutableCounterInt appsCompleted; @Metric("# of apps completed") MutableCounterInt appsCompleted;
@Metric("# of apps killed") MutableCounterInt appsKilled; @Metric("# of apps killed") MutableCounterInt appsKilled;
@Metric("# of apps failed") MutableGaugeInt appsFailed; @Metric("# of apps failed") MutableCounterInt appsFailed;
@Metric("Allocated memory in MB") MutableGaugeInt allocatedMB; @Metric("Allocated memory in MB") MutableGaugeInt allocatedMB;
@Metric("Allocated CPU in virtual cores") MutableGaugeInt allocatedVCores; @Metric("Allocated CPU in virtual cores") MutableGaugeInt allocatedVCores;
@ -214,54 +214,70 @@ public class QueueMetrics implements MetricsSource {
registry.snapshot(collector.addRecord(registry.info()), all); registry.snapshot(collector.addRecord(registry.info()), all);
} }
public void submitApp(String user, int attemptId) { public void submitApp(String user) {
if (attemptId == 1) { appsSubmitted.incr();
appsSubmitted.incr();
} else {
appsFailed.decr();
}
appsPending.incr();
QueueMetrics userMetrics = getUserMetrics(user); QueueMetrics userMetrics = getUserMetrics(user);
if (userMetrics != null) { if (userMetrics != null) {
userMetrics.submitApp(user, attemptId); userMetrics.submitApp(user);
} }
if (parent != null) { if (parent != null) {
parent.submitApp(user, attemptId); parent.submitApp(user);
} }
} }
public void incrAppsRunning(AppSchedulingInfo app, String user) { public void submitAppAttempt(String user) {
runBuckets.add(app.getApplicationId(), System.currentTimeMillis()); appsPending.incr();
QueueMetrics userMetrics = getUserMetrics(user);
if (userMetrics != null) {
userMetrics.submitAppAttempt(user);
}
if (parent != null) {
parent.submitAppAttempt(user);
}
}
public void runAppAttempt(ApplicationId appId, String user) {
runBuckets.add(appId, System.currentTimeMillis());
appsRunning.incr(); appsRunning.incr();
appsPending.decr(); appsPending.decr();
QueueMetrics userMetrics = getUserMetrics(user); QueueMetrics userMetrics = getUserMetrics(user);
if (userMetrics != null) { if (userMetrics != null) {
userMetrics.incrAppsRunning(app, user); userMetrics.runAppAttempt(appId, user);
} }
if (parent != null) { if (parent != null) {
parent.incrAppsRunning(app, user); parent.runAppAttempt(appId, user);
} }
} }
public void finishApp(AppSchedulingInfo app, public void finishAppAttempt(
RMAppAttemptState rmAppAttemptFinalState) { ApplicationId appId, boolean isPending, String user) {
runBuckets.remove(app.getApplicationId()); runBuckets.remove(appId);
switch (rmAppAttemptFinalState) { if (isPending) {
case KILLED: appsKilled.incr(); break;
case FAILED: appsFailed.incr(); break;
default: appsCompleted.incr(); break;
}
if (app.isPending()) {
appsPending.decr(); appsPending.decr();
} else { } else {
appsRunning.decr(); appsRunning.decr();
} }
QueueMetrics userMetrics = getUserMetrics(app.getUser()); QueueMetrics userMetrics = getUserMetrics(user);
if (userMetrics != null) { if (userMetrics != null) {
userMetrics.finishApp(app, rmAppAttemptFinalState); userMetrics.finishAppAttempt(appId, isPending, user);
} }
if (parent != null) { if (parent != null) {
parent.finishApp(app, rmAppAttemptFinalState); parent.finishAppAttempt(appId, isPending, user);
}
}
public void finishApp(String user, RMAppState rmAppFinalState) {
switch (rmAppFinalState) {
case KILLED: appsKilled.incr(); break;
case FAILED: appsFailed.incr(); break;
default: appsCompleted.incr(); break;
}
QueueMetrics userMetrics = getUserMetrics(user);
if (userMetrics != null) {
userMetrics.finishApp(user, rmAppFinalState);
}
if (parent != null) {
parent.finishApp(user, rmAppFinalState);
} }
} }

View File

@ -19,6 +19,7 @@ package org.apache.hadoop.yarn.server.resourcemanager.scheduler;
import org.apache.hadoop.classification.InterfaceAudience.Private; import org.apache.hadoop.classification.InterfaceAudience.Private;
import org.apache.hadoop.classification.InterfaceStability.Unstable; import org.apache.hadoop.classification.InterfaceStability.Unstable;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppState;
@Private @Private
@Unstable @Unstable
@ -48,4 +49,9 @@ public class SchedulerApplication {
public void setCurrentAppAttempt(SchedulerApplicationAttempt currentAttempt) { public void setCurrentAppAttempt(SchedulerApplicationAttempt currentAttempt) {
this.currentAttempt = currentAttempt; this.currentAttempt = currentAttempt;
} }
public void stop(RMAppState rmAppFinalState) {
queue.getMetrics().finishApp(user, rmAppFinalState);
}
} }

View File

@ -491,6 +491,7 @@ public class CapacityScheduler
if (application == null){ if (application == null){
// The AppRemovedSchedulerEvent maybe sent on recovery for completed apps, // The AppRemovedSchedulerEvent maybe sent on recovery for completed apps,
// ignore it. // ignore it.
LOG.warn("Couldn't find application " + applicationId);
return; return;
} }
CSQueue queue = (CSQueue) application.getQueue(); CSQueue queue = (CSQueue) application.getQueue();
@ -500,6 +501,7 @@ public class CapacityScheduler
} else { } else {
queue.finishApplication(applicationId, application.getUser()); queue.finishApplication(applicationId, application.getUser());
} }
application.stop(finalState);
applications.remove(applicationId); applications.remove(applicationId);
} }

View File

@ -644,8 +644,7 @@ public class LeafQueue implements CSQueue {
addApplicationAttempt(application, user); addApplicationAttempt(application, user);
} }
int attemptId = application.getApplicationAttemptId().getAttemptId(); metrics.submitAppAttempt(userName);
metrics.submitApp(userName, attemptId);
getParent().submitApplicationAttempt(application, userName); getParent().submitApplicationAttempt(application, userName);
} }
@ -702,6 +701,8 @@ public class LeafQueue implements CSQueue {
getParent().getQueuePath(), ace); getParent().getQueuePath(), ace);
throw ace; throw ace;
} }
metrics.submitApp(userName);
} }
private synchronized void activateApplications() { private synchronized void activateApplications() {

View File

@ -643,6 +643,7 @@ public class FairScheduler implements ResourceScheduler {
SchedulerApplication application = SchedulerApplication application =
new SchedulerApplication(queue, user); new SchedulerApplication(queue, user);
applications.put(applicationId, application); applications.put(applicationId, application);
queue.getMetrics().submitApp(user);
LOG.info("Accepted application " + applicationId + " from user: " + user LOG.info("Accepted application " + applicationId + " from user: " + user
+ ", in queue: " + queueName + ", currently num of applications: " + ", in queue: " + queueName + ", currently num of applications: "
@ -680,7 +681,7 @@ public class FairScheduler implements ResourceScheduler {
maxRunningEnforcer.trackNonRunnableApp(attempt); maxRunningEnforcer.trackNonRunnableApp(attempt);
} }
queue.getMetrics().submitApp(user, applicationAttemptId.getAttemptId()); queue.getMetrics().submitAppAttempt(user);
LOG.info("Added Application Attempt " + applicationAttemptId LOG.info("Added Application Attempt " + applicationAttemptId
+ " to scheduler from user: " + user); + " to scheduler from user: " + user);
@ -714,6 +715,12 @@ public class FairScheduler implements ResourceScheduler {
private synchronized void removeApplication(ApplicationId applicationId, private synchronized void removeApplication(ApplicationId applicationId,
RMAppState finalState) { RMAppState finalState) {
SchedulerApplication application = applications.get(applicationId);
if (application == null){
LOG.warn("Couldn't find application " + applicationId);
return;
}
application.stop(finalState);
applications.remove(applicationId); applications.remove(applicationId);
} }

View File

@ -363,8 +363,9 @@ public class FifoScheduler implements ResourceScheduler, Configurable {
private synchronized void addApplication(ApplicationId applicationId, private synchronized void addApplication(ApplicationId applicationId,
String queue, String user) { String queue, String user) {
SchedulerApplication application = SchedulerApplication application =
new SchedulerApplication(null, user); new SchedulerApplication(DEFAULT_QUEUE, user);
applications.put(applicationId, application); applications.put(applicationId, application);
metrics.submitApp(user);
LOG.info("Accepted application " + applicationId + " from user: " + user LOG.info("Accepted application " + applicationId + " from user: " + user
+ ", currently num of applications: " + applications.size()); + ", currently num of applications: " + applications.size());
rmContext.getDispatcher().getEventHandler() rmContext.getDispatcher().getEventHandler()
@ -388,7 +389,7 @@ public class FifoScheduler implements ResourceScheduler, Configurable {
} }
application.setCurrentAppAttempt(schedulerApp); application.setCurrentAppAttempt(schedulerApp);
metrics.submitApp(user, appAttemptId.getAttemptId()); metrics.submitAppAttempt(user);
LOG.info("Added Application Attempt " + appAttemptId LOG.info("Added Application Attempt " + appAttemptId
+ " to scheduler from user " + application.getUser()); + " to scheduler from user " + application.getUser());
rmContext.getDispatcher().getEventHandler().handle( rmContext.getDispatcher().getEventHandler().handle(
@ -399,10 +400,15 @@ public class FifoScheduler implements ResourceScheduler, Configurable {
private synchronized void doneApplication(ApplicationId applicationId, private synchronized void doneApplication(ApplicationId applicationId,
RMAppState finalState) { RMAppState finalState) {
SchedulerApplication application = applications.get(applicationId); SchedulerApplication application = applications.get(applicationId);
if (application == null){
LOG.warn("Couldn't find application " + applicationId);
return;
}
// Inform the activeUsersManager // Inform the activeUsersManager
activeUsersManager.deactivateApplication(application.getUser(), activeUsersManager.deactivateApplication(application.getUser(),
applicationId); applicationId);
application.stop(finalState);
applications.remove(applicationId); applications.remove(applicationId);
} }

View File

@ -84,6 +84,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppState; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppState;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptState; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptState;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.QueueMetrics;
import org.apache.hadoop.yarn.server.utils.BuilderUtils; import org.apache.hadoop.yarn.server.utils.BuilderUtils;
import org.apache.hadoop.yarn.util.ConverterUtils; import org.apache.hadoop.yarn.util.ConverterUtils;
import org.apache.log4j.Level; import org.apache.log4j.Level;
@ -179,7 +180,7 @@ public class TestRMRestart {
am1.registerAppAttempt(); am1.registerAppAttempt();
// AM request for containers // AM request for containers
am1.allocate("127.0.0.1" , 1000, 1, new ArrayList<ContainerId>()); am1.allocate("127.0.0.1" , 1000, 1, new ArrayList<ContainerId>());
// kick the scheduler // kick the scheduler
nm1.nodeHeartbeat(true); nm1.nodeHeartbeat(true);
List<Container> conts = am1.allocate(new ArrayList<ResourceRequest>(), List<Container> conts = am1.allocate(new ArrayList<ResourceRequest>(),
@ -1543,6 +1544,128 @@ public class TestRMRestart {
Assert.assertEquals(2, ((TestMemoryRMStateStore) memStore).updateApp); Assert.assertEquals(2, ((TestMemoryRMStateStore) memStore).updateApp);
} }
@SuppressWarnings("resource")
@Test
public void testQueueMetricsOnRMRestart() throws Exception {
conf.setInt(YarnConfiguration.RM_AM_MAX_ATTEMPTS,
YarnConfiguration.DEFAULT_RM_AM_MAX_ATTEMPTS);
MemoryRMStateStore memStore = new MemoryRMStateStore();
memStore.init(conf);
// PHASE 1: create state in an RM
// start RM
MockRM rm1 = new MockRM(conf, memStore);
rm1.start();
MockNM nm1 =
new MockNM("127.0.0.1:1234", 15120, rm1.getResourceTrackerService());
nm1.registerNode();
QueueMetrics qm1 = rm1.getResourceScheduler().getRootQueueMetrics();
resetQueueMetrics(qm1);
assertQueueMetrics(qm1, 0, 0, 0, 0);
// create app that gets launched and does allocate before RM restart
RMApp app1 = rm1.submitApp(200);
assertQueueMetrics(qm1, 1, 1, 0, 0);
nm1.nodeHeartbeat(true);
RMAppAttempt attempt1 = app1.getCurrentAppAttempt();
ApplicationAttemptId attemptId1 = attempt1.getAppAttemptId();
rm1.waitForState(attemptId1, RMAppAttemptState.ALLOCATED);
MockAM am1 = rm1.sendAMLaunched(attempt1.getAppAttemptId());
am1.registerAppAttempt();
am1.allocate("127.0.0.1" , 1000, 1, new ArrayList<ContainerId>());
nm1.nodeHeartbeat(true);
List<Container> conts = am1.allocate(new ArrayList<ResourceRequest>(),
new ArrayList<ContainerId>()).getAllocatedContainers();
while (conts.size() == 0) {
nm1.nodeHeartbeat(true);
conts.addAll(am1.allocate(new ArrayList<ResourceRequest>(),
new ArrayList<ContainerId>()).getAllocatedContainers());
Thread.sleep(500);
}
assertQueueMetrics(qm1, 1, 0, 1, 0);
// PHASE 2: create new RM and start from old state
// create new RM to represent restart and recover state
MockRM rm2 = new MockRM(conf, memStore);
rm2.start();
nm1.setResourceTrackerService(rm2.getResourceTrackerService());
QueueMetrics qm2 = rm2.getResourceScheduler().getRootQueueMetrics();
resetQueueMetrics(qm2);
assertQueueMetrics(qm2, 0, 0, 0, 0);
// recover app
RMApp loadedApp1 = rm2.getRMContext().getRMApps().get(app1.getApplicationId());
am1.setAMRMProtocol(rm2.getApplicationMasterService());
am1.allocate(new ArrayList<ResourceRequest>(), new ArrayList<ContainerId>());
nm1.nodeHeartbeat(true);
nm1 = new MockNM("127.0.0.1:1234", 15120, rm2.getResourceTrackerService());
List<ContainerStatus> containerStatuses = new ArrayList<ContainerStatus>();
ContainerStatus containerStatus =
BuilderUtils.newContainerStatus(BuilderUtils.newContainerId(loadedApp1
.getCurrentAppAttempt().getAppAttemptId(), 1),
ContainerState.COMPLETE, "Killed AM container", 143);
containerStatuses.add(containerStatus);
nm1.registerNode(containerStatuses);
int timeoutSecs = 0;
while (loadedApp1.getAppAttempts().size() != 2 && timeoutSecs++ < 40) {;
Thread.sleep(200);
}
assertQueueMetrics(qm2, 1, 1, 0, 0);
nm1.nodeHeartbeat(true);
attempt1 = loadedApp1.getCurrentAppAttempt();
attemptId1 = attempt1.getAppAttemptId();
rm2.waitForState(attemptId1, RMAppAttemptState.ALLOCATED);
assertQueueMetrics(qm2, 1, 0, 1, 0);
am1 = rm2.sendAMLaunched(attempt1.getAppAttemptId());
am1.registerAppAttempt();
am1.allocate("127.0.0.1" , 1000, 3, new ArrayList<ContainerId>());
nm1.nodeHeartbeat(true);
conts = am1.allocate(new ArrayList<ResourceRequest>(),
new ArrayList<ContainerId>()).getAllocatedContainers();
while (conts.size() == 0) {
nm1.nodeHeartbeat(true);
conts.addAll(am1.allocate(new ArrayList<ResourceRequest>(),
new ArrayList<ContainerId>()).getAllocatedContainers());
Thread.sleep(500);
}
// finish the AMs
finishApplicationMaster(loadedApp1, rm2, nm1, am1);
assertQueueMetrics(qm2, 1, 0, 0, 1);
// stop RM's
rm2.stop();
rm1.stop();
}
// The metrics has some carry-on value from the previous RM, because the
// test case is in-memory, for the same queue name (e.g. root), there's
// always a singleton QueueMetrics object.
private int appsSubmittedCarryOn = 0;
private int appsPendingCarryOn = 0;
private int appsRunningCarryOn = 0;
private int appsCompletedCarryOn = 0;
private void resetQueueMetrics(QueueMetrics qm) {
appsSubmittedCarryOn = qm.getAppsSubmitted();
appsPendingCarryOn = qm.getAppsPending();
appsRunningCarryOn = qm.getAppsRunning();
appsCompletedCarryOn = qm.getAppsCompleted();
}
private void assertQueueMetrics(QueueMetrics qm, int appsSubmitted,
int appsPending, int appsRunning, int appsCompleted) {
Assert.assertEquals(qm.getAppsSubmitted(),
appsSubmitted + appsSubmittedCarryOn);
Assert.assertEquals(qm.getAppsPending(),
appsPending + appsPendingCarryOn);
Assert.assertEquals(qm.getAppsRunning(),
appsRunning + appsRunningCarryOn);
Assert.assertEquals(qm.getAppsCompleted(),
appsCompleted + appsCompletedCarryOn);
}
public class TestMemoryRMStateStore extends MemoryRMStateStore { public class TestMemoryRMStateStore extends MemoryRMStateStore {
int count = 0; int count = 0;
public int updateApp = 0; public int updateApp = 0;

View File

@ -37,7 +37,7 @@ import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.server.resourcemanager.MockRM; import org.apache.hadoop.yarn.server.resourcemanager.MockRM;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptState; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppState;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fifo.FifoScheduler; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fifo.FifoScheduler;
import org.apache.hadoop.yarn.server.utils.BuilderUtils; import org.apache.hadoop.yarn.server.utils.BuilderUtils;
import org.apache.hadoop.yarn.util.resource.Resources; import org.apache.hadoop.yarn.util.resource.Resources;
@ -66,8 +66,10 @@ public class TestQueueMetrics {
MetricsSource queueSource= queueSource(ms, queueName); MetricsSource queueSource= queueSource(ms, queueName);
AppSchedulingInfo app = mockApp(user); AppSchedulingInfo app = mockApp(user);
metrics.submitApp(user, 1); metrics.submitApp(user);
MetricsSource userSource = userSource(ms, queueName, user); MetricsSource userSource = userSource(ms, queueName, user);
checkApps(queueSource, 1, 0, 0, 0, 0, 0, true);
metrics.submitAppAttempt(user);
checkApps(queueSource, 1, 1, 0, 0, 0, 0, true); checkApps(queueSource, 1, 1, 0, 0, 0, 0, true);
metrics.setAvailableResourcesToQueue(Resources.createResource(100*GB, 100)); metrics.setAvailableResourcesToQueue(Resources.createResource(100*GB, 100));
@ -76,7 +78,7 @@ public class TestQueueMetrics {
// configurable cluster/queue resources // configurable cluster/queue resources
checkResources(queueSource, 0, 0, 0, 0, 0, 100*GB, 100, 15*GB, 15, 5, 0, 0, 0); checkResources(queueSource, 0, 0, 0, 0, 0, 100*GB, 100, 15*GB, 15, 5, 0, 0, 0);
metrics.incrAppsRunning(app, user); metrics.runAppAttempt(app.getApplicationId(), user);
checkApps(queueSource, 1, 0, 1, 0, 0, 0, true); checkApps(queueSource, 1, 0, 1, 0, 0, 0, true);
metrics.allocateResources(user, 3, Resources.createResource(2*GB, 2)); metrics.allocateResources(user, 3, Resources.createResource(2*GB, 2));
@ -85,7 +87,10 @@ public class TestQueueMetrics {
metrics.releaseResources(user, 1, Resources.createResource(2*GB, 2)); metrics.releaseResources(user, 1, Resources.createResource(2*GB, 2));
checkResources(queueSource, 4*GB, 4, 2, 3, 1, 100*GB, 100, 9*GB, 9, 2, 0, 0, 0); checkResources(queueSource, 4*GB, 4, 2, 3, 1, 100*GB, 100, 9*GB, 9, 2, 0, 0, 0);
metrics.finishApp(app, RMAppAttemptState.FINISHED); metrics.finishAppAttempt(
app.getApplicationId(), app.isPending(), app.getUser());
checkApps(queueSource, 1, 0, 0, 0, 0, 0, true);
metrics.finishApp(user, RMAppState.FINISHED);
checkApps(queueSource, 1, 0, 0, 1, 0, 0, true); checkApps(queueSource, 1, 0, 0, 1, 0, 0, true);
assertNull(userSource); assertNull(userSource);
} }
@ -100,39 +105,47 @@ public class TestQueueMetrics {
MetricsSource queueSource = queueSource(ms, queueName); MetricsSource queueSource = queueSource(ms, queueName);
AppSchedulingInfo app = mockApp(user); AppSchedulingInfo app = mockApp(user);
metrics.submitApp(user, 1); metrics.submitApp(user);
MetricsSource userSource = userSource(ms, queueName, user); MetricsSource userSource = userSource(ms, queueName, user);
checkApps(queueSource, 1, 0, 0, 0, 0, 0, true);
metrics.submitAppAttempt(user);
checkApps(queueSource, 1, 1, 0, 0, 0, 0, true); checkApps(queueSource, 1, 1, 0, 0, 0, 0, true);
metrics.incrAppsRunning(app, user); metrics.runAppAttempt(app.getApplicationId(), user);
checkApps(queueSource, 1, 0, 1, 0, 0, 0, true); checkApps(queueSource, 1, 0, 1, 0, 0, 0, true);
metrics.finishApp(app, RMAppAttemptState.FAILED); metrics.finishAppAttempt(
checkApps(queueSource, 1, 0, 0, 0, 1, 0, true); app.getApplicationId(), app.isPending(), app.getUser());
checkApps(queueSource, 1, 0, 0, 0, 0, 0, true);
// As the application has failed, framework retries the same application // As the application has failed, framework retries the same application
// based on configuration // based on configuration
metrics.submitApp(user, 2); metrics.submitAppAttempt(user);
checkApps(queueSource, 1, 1, 0, 0, 0, 0, true); checkApps(queueSource, 1, 1, 0, 0, 0, 0, true);
metrics.incrAppsRunning(app, user); metrics.runAppAttempt(app.getApplicationId(), user);
checkApps(queueSource, 1, 0, 1, 0, 0, 0, true); checkApps(queueSource, 1, 0, 1, 0, 0, 0, true);
// Suppose say application has failed this time as well. // Suppose say application has failed this time as well.
metrics.finishApp(app, RMAppAttemptState.FAILED); metrics.finishAppAttempt(
checkApps(queueSource, 1, 0, 0, 0, 1, 0, true); app.getApplicationId(), app.isPending(), app.getUser());
checkApps(queueSource, 1, 0, 0, 0, 0, 0, true);
// As the application has failed, framework retries the same application // As the application has failed, framework retries the same application
// based on configuration // based on configuration
metrics.submitApp(user, 3); metrics.submitAppAttempt(user);
checkApps(queueSource, 1, 1, 0, 0, 0, 0, true); checkApps(queueSource, 1, 1, 0, 0, 0, 0, true);
metrics.incrAppsRunning(app, user); metrics.runAppAttempt(app.getApplicationId(), user);
checkApps(queueSource, 1, 0, 1, 0, 0, 0, true); checkApps(queueSource, 1, 0, 1, 0, 0, 0, true);
// Suppose say application has finished. // Suppose say application has failed, and there's no more retries.
metrics.finishApp(app, RMAppAttemptState.FINISHED); metrics.finishAppAttempt(
checkApps(queueSource, 1, 0, 0, 1, 0, 0, true); app.getApplicationId(), app.isPending(), app.getUser());
checkApps(queueSource, 1, 0, 0, 0, 0, 0, true);
metrics.finishApp(user, RMAppState.FAILED);
checkApps(queueSource, 1, 0, 0, 0, 1, 0, true);
assertNull(userSource); assertNull(userSource);
} }
@ -146,9 +159,13 @@ public class TestQueueMetrics {
MetricsSource queueSource = queueSource(ms, queueName); MetricsSource queueSource = queueSource(ms, queueName);
AppSchedulingInfo app = mockApp(user); AppSchedulingInfo app = mockApp(user);
metrics.submitApp(user, 1); metrics.submitApp(user);
MetricsSource userSource = userSource(ms, queueName, user); MetricsSource userSource = userSource(ms, queueName, user);
checkApps(queueSource, 1, 0, 0, 0, 0, 0, true);
checkApps(userSource, 1, 0, 0, 0, 0, 0, true);
metrics.submitAppAttempt(user);
checkApps(queueSource, 1, 1, 0, 0, 0, 0, true); checkApps(queueSource, 1, 1, 0, 0, 0, 0, true);
checkApps(userSource, 1, 1, 0, 0, 0, 0, true); checkApps(userSource, 1, 1, 0, 0, 0, 0, true);
@ -160,7 +177,7 @@ public class TestQueueMetrics {
checkResources(queueSource, 0, 0, 0, 0, 0, 100*GB, 100, 15*GB, 15, 5, 0, 0, 0); checkResources(queueSource, 0, 0, 0, 0, 0, 100*GB, 100, 15*GB, 15, 5, 0, 0, 0);
checkResources(userSource, 0, 0, 0, 0, 0, 10*GB, 10, 15*GB, 15, 5, 0, 0, 0); checkResources(userSource, 0, 0, 0, 0, 0, 10*GB, 10, 15*GB, 15, 5, 0, 0, 0);
metrics.incrAppsRunning(app, user); metrics.runAppAttempt(app.getApplicationId(), user);
checkApps(queueSource, 1, 0, 1, 0, 0, 0, true); checkApps(queueSource, 1, 0, 1, 0, 0, 0, true);
checkApps(userSource, 1, 0, 1, 0, 0, 0, true); checkApps(userSource, 1, 0, 1, 0, 0, 0, true);
@ -172,7 +189,11 @@ public class TestQueueMetrics {
checkResources(queueSource, 4*GB, 4, 2, 3, 1, 100*GB, 100, 9*GB, 9, 2, 0, 0, 0); checkResources(queueSource, 4*GB, 4, 2, 3, 1, 100*GB, 100, 9*GB, 9, 2, 0, 0, 0);
checkResources(userSource, 4*GB, 4, 2, 3, 1, 10*GB, 10, 9*GB, 9, 2, 0, 0, 0); checkResources(userSource, 4*GB, 4, 2, 3, 1, 10*GB, 10, 9*GB, 9, 2, 0, 0, 0);
metrics.finishApp(app, RMAppAttemptState.FINISHED); metrics.finishAppAttempt(
app.getApplicationId(), app.isPending(), app.getUser());
checkApps(queueSource, 1, 0, 0, 0, 0, 0, true);
checkApps(userSource, 1, 0, 0, 0, 0, 0, true);
metrics.finishApp(user, RMAppState.FINISHED);
checkApps(queueSource, 1, 0, 0, 1, 0, 0, true); checkApps(queueSource, 1, 0, 0, 1, 0, 0, true);
checkApps(userSource, 1, 0, 0, 1, 0, 0, true); checkApps(userSource, 1, 0, 0, 1, 0, 0, true);
} }
@ -192,10 +213,16 @@ public class TestQueueMetrics {
MetricsSource queueSource = queueSource(ms, leafQueueName); MetricsSource queueSource = queueSource(ms, leafQueueName);
AppSchedulingInfo app = mockApp(user); AppSchedulingInfo app = mockApp(user);
metrics.submitApp(user, 1); metrics.submitApp(user);
MetricsSource userSource = userSource(ms, leafQueueName, user); MetricsSource userSource = userSource(ms, leafQueueName, user);
MetricsSource parentUserSource = userSource(ms, parentQueueName, user); MetricsSource parentUserSource = userSource(ms, parentQueueName, user);
checkApps(queueSource, 1, 0, 0, 0, 0, 0, true);
checkApps(parentQueueSource, 1, 0, 0, 0, 0, 0, true);
checkApps(userSource, 1, 0, 0, 0, 0, 0, true);
checkApps(parentUserSource, 1, 0, 0, 0, 0, 0, true);
metrics.submitAppAttempt(user);
checkApps(queueSource, 1, 1, 0, 0, 0, 0, true); checkApps(queueSource, 1, 1, 0, 0, 0, 0, true);
checkApps(parentQueueSource, 1, 1, 0, 0, 0, 0, true); checkApps(parentQueueSource, 1, 1, 0, 0, 0, 0, true);
checkApps(userSource, 1, 1, 0, 0, 0, 0, true); checkApps(userSource, 1, 1, 0, 0, 0, 0, true);
@ -211,7 +238,7 @@ public class TestQueueMetrics {
checkResources(userSource, 0, 0, 0, 0, 0, 10*GB, 10, 15*GB, 15, 5, 0, 0, 0); checkResources(userSource, 0, 0, 0, 0, 0, 10*GB, 10, 15*GB, 15, 5, 0, 0, 0);
checkResources(parentUserSource, 0, 0, 0, 0, 0, 10*GB, 10, 15*GB, 15, 5, 0, 0, 0); checkResources(parentUserSource, 0, 0, 0, 0, 0, 10*GB, 10, 15*GB, 15, 5, 0, 0, 0);
metrics.incrAppsRunning(app, user); metrics.runAppAttempt(app.getApplicationId(), user);
checkApps(queueSource, 1, 0, 1, 0, 0, 0, true); checkApps(queueSource, 1, 0, 1, 0, 0, 0, true);
checkApps(userSource, 1, 0, 1, 0, 0, 0, true); checkApps(userSource, 1, 0, 1, 0, 0, 0, true);
@ -231,7 +258,14 @@ public class TestQueueMetrics {
checkResources(userSource, 4*GB, 4, 2, 3, 1, 10*GB, 10, 9*GB, 9, 2, 0, 0, 0); checkResources(userSource, 4*GB, 4, 2, 3, 1, 10*GB, 10, 9*GB, 9, 2, 0, 0, 0);
checkResources(parentUserSource, 4*GB, 4, 2, 3, 1, 10*GB, 10, 9*GB, 9, 2, 0, 0, 0); checkResources(parentUserSource, 4*GB, 4, 2, 3, 1, 10*GB, 10, 9*GB, 9, 2, 0, 0, 0);
metrics.finishApp(app, RMAppAttemptState.FINISHED); metrics.finishAppAttempt(
app.getApplicationId(), app.isPending(), app.getUser());
checkApps(queueSource, 1, 0, 0, 0, 0, 0, true);
checkApps(parentQueueSource, 1, 0, 0, 0, 0, 0, true);
checkApps(userSource, 1, 0, 0, 0, 0, 0, true);
checkApps(parentUserSource, 1, 0, 0, 0, 0, 0, true);
metrics.finishApp(user, RMAppState.FINISHED);
checkApps(queueSource, 1, 0, 0, 1, 0, 0, true); checkApps(queueSource, 1, 0, 0, 1, 0, 0, true);
checkApps(parentQueueSource, 1, 0, 0, 1, 0, 0, true); checkApps(parentQueueSource, 1, 0, 0, 1, 0, 0, true);
checkApps(userSource, 1, 0, 0, 1, 0, 0, true); checkApps(userSource, 1, 0, 0, 1, 0, 0, true);
@ -308,7 +342,7 @@ public class TestQueueMetrics {
assertGauge("AppsPending", pending, rb); assertGauge("AppsPending", pending, rb);
assertGauge("AppsRunning", running, rb); assertGauge("AppsRunning", running, rb);
assertCounter("AppsCompleted", completed, rb); assertCounter("AppsCompleted", completed, rb);
assertGauge("AppsFailed", failed, rb); assertCounter("AppsFailed", failed, rb);
assertCounter("AppsKilled", killed, rb); assertCounter("AppsKilled", killed, rb);
} }

View File

@ -55,6 +55,7 @@ import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.factories.RecordFactory; import org.apache.hadoop.yarn.factories.RecordFactory;
import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider; import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
import org.apache.hadoop.yarn.server.resourcemanager.RMContext; import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppState;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptState; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptState;
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer; import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerEventType; import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerEventType;
@ -66,6 +67,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaS
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppAddedSchedulerEvent; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppAddedSchedulerEvent;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppAttemptAddedSchedulerEvent; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppAttemptAddedSchedulerEvent;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppAttemptRemovedSchedulerEvent; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppAttemptRemovedSchedulerEvent;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppRemovedSchedulerEvent;
import org.apache.hadoop.yarn.server.resourcemanager.security.RMContainerTokenSecretManager; import org.apache.hadoop.yarn.server.resourcemanager.security.RMContainerTokenSecretManager;
import org.apache.hadoop.yarn.util.resource.DefaultResourceCalculator; import org.apache.hadoop.yarn.util.resource.DefaultResourceCalculator;
import org.apache.hadoop.yarn.util.resource.ResourceCalculator; import org.apache.hadoop.yarn.util.resource.ResourceCalculator;
@ -360,7 +362,7 @@ public class TestLeafQueue {
cs.handle(event); cs.handle(event);
assertEquals(0, a.getMetrics().getAppsPending()); assertEquals(0, a.getMetrics().getAppsPending());
assertEquals(1, a.getMetrics().getAppsFailed()); assertEquals(0, a.getMetrics().getAppsFailed());
// Attempt the same application again // Attempt the same application again
final ApplicationAttemptId appAttemptId_1 = TestUtils final ApplicationAttemptId appAttemptId_1 = TestUtils
@ -375,6 +377,9 @@ public class TestLeafQueue {
event = new AppAttemptRemovedSchedulerEvent(appAttemptId_0, event = new AppAttemptRemovedSchedulerEvent(appAttemptId_0,
RMAppAttemptState.FINISHED, false); RMAppAttemptState.FINISHED, false);
cs.handle(event); cs.handle(event);
AppRemovedSchedulerEvent rEvent = new AppRemovedSchedulerEvent(
appAttemptId_0.getApplicationId(), RMAppState.FINISHED);
cs.handle(rEvent);
assertEquals(1, a.getMetrics().getAppsSubmitted()); assertEquals(1, a.getMetrics().getAppsSubmitted());
assertEquals(0, a.getMetrics().getAppsPending()); assertEquals(0, a.getMetrics().getAppsPending());

View File

@ -70,7 +70,6 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppAttemptA
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeAddedSchedulerEvent; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeAddedSchedulerEvent;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeUpdateSchedulerEvent; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeUpdateSchedulerEvent;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.SchedulerEvent; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.SchedulerEvent;
import org.apache.hadoop.yarn.server.resourcemanager.security.ClientToAMTokenSecretManagerInRM;
import org.apache.hadoop.yarn.server.resourcemanager.security.NMTokenSecretManagerInRM; import org.apache.hadoop.yarn.server.resourcemanager.security.NMTokenSecretManagerInRM;
import org.apache.hadoop.yarn.server.resourcemanager.security.RMContainerTokenSecretManager; import org.apache.hadoop.yarn.server.resourcemanager.security.RMContainerTokenSecretManager;
import org.apache.hadoop.yarn.server.utils.BuilderUtils; import org.apache.hadoop.yarn.server.utils.BuilderUtils;
@ -160,11 +159,6 @@ public class TestFifoScheduler {
schedular.handle(attemptEvent); schedular.handle(attemptEvent);
appAttemptId = BuilderUtils.newApplicationAttemptId(appId, 2); appAttemptId = BuilderUtils.newApplicationAttemptId(appId, 2);
SchedulerEvent appEvent2 =
new AppAddedSchedulerEvent(appAttemptId.getApplicationId(), "queue",
"user");
schedular.handle(appEvent2);
SchedulerEvent attemptEvent2 = SchedulerEvent attemptEvent2 =
new AppAttemptAddedSchedulerEvent(appAttemptId, false); new AppAttemptAddedSchedulerEvent(appAttemptId, false);
schedular.handle(attemptEvent2); schedular.handle(attemptEvent2);