MAPREDUCE-3870. Invalid App Metrics (Bhallamudi Venkata Siva Kamesh via tgraves).

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1341160 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Thomas Graves 2012-05-21 19:15:41 +00:00
parent 2eebc21a55
commit d74bec2f88
6 changed files with 93 additions and 16 deletions

View File

@ -517,6 +517,9 @@ Release 0.23.3 - UNRELEASED
MAPREDUCE-4269. documentation: Gridmix has javadoc warnings in MAPREDUCE-4269. documentation: Gridmix has javadoc warnings in
StressJobFactory (Jonathon Eagles via tgraves). StressJobFactory (Jonathon Eagles via tgraves).
MAPREDUCE-3870. Invalid App Metrics
(Bhallamudi Venkata Siva Kamesh via tgraves).
Release 0.23.2 - UNRELEASED Release 0.23.2 - UNRELEASED
INCOMPATIBLE CHANGES INCOMPATIBLE CHANGES

View File

@ -55,7 +55,7 @@ public class QueueMetrics implements MetricsSource {
@Metric("# of pending apps") MutableGaugeInt appsPending; @Metric("# of pending apps") MutableGaugeInt appsPending;
@Metric("# of apps completed") MutableCounterInt appsCompleted; @Metric("# of apps completed") MutableCounterInt appsCompleted;
@Metric("# of apps killed") MutableCounterInt appsKilled; @Metric("# of apps killed") MutableCounterInt appsKilled;
@Metric("# of apps failed") MutableCounterInt appsFailed; @Metric("# of apps failed") MutableGaugeInt appsFailed;
@Metric("Allocated memory in MB") MutableGaugeInt allocatedMB; @Metric("Allocated memory in MB") MutableGaugeInt allocatedMB;
@Metric("# of allocated containers") MutableGaugeInt allocatedContainers; @Metric("# of allocated containers") MutableGaugeInt allocatedContainers;
@ -181,15 +181,19 @@ public void getMetrics(MetricsCollector collector, boolean all) {
registry.snapshot(collector.addRecord(registry.info()), all); registry.snapshot(collector.addRecord(registry.info()), all);
} }
public void submitApp(String user) { public void submitApp(String user, int attemptId) {
if (attemptId == 1) {
appsSubmitted.incr(); appsSubmitted.incr();
} else {
appsFailed.decr();
}
appsPending.incr(); appsPending.incr();
QueueMetrics userMetrics = getUserMetrics(user); QueueMetrics userMetrics = getUserMetrics(user);
if (userMetrics != null) { if (userMetrics != null) {
userMetrics.submitApp(user); userMetrics.submitApp(user, attemptId);
} }
if (parent != null) { if (parent != null) {
parent.submitApp(user); parent.submitApp(user, attemptId);
} }
} }

View File

@ -632,9 +632,7 @@ public void submitApplication(SchedulerApp application, String userName,
} }
int attemptId = application.getApplicationAttemptId().getAttemptId(); int attemptId = application.getApplicationAttemptId().getAttemptId();
if (attemptId == 1) { metrics.submitApp(userName, attemptId);
metrics.submitApp(userName);
}
// Inform the parent queue // Inform the parent queue
try { try {

View File

@ -302,9 +302,7 @@ private synchronized void addApplication(ApplicationAttemptId appAttemptId,
new SchedulerApp(appAttemptId, user, DEFAULT_QUEUE, activeUsersManager, new SchedulerApp(appAttemptId, user, DEFAULT_QUEUE, activeUsersManager,
this.rmContext, null); this.rmContext, null);
applications.put(appAttemptId, schedulerApp); applications.put(appAttemptId, schedulerApp);
if (appAttemptId.getAttemptId() == 1) { metrics.submitApp(user, appAttemptId.getAttemptId());
metrics.submitApp(user);
}
LOG.info("Application Submission: " + appAttemptId.getApplicationId() + LOG.info("Application Submission: " + appAttemptId.getApplicationId() +
" from " + user + ", currently active: " + applications.size()); " from " + user + ", currently active: " + applications.size());
rmContext.getDispatcher().getEventHandler().handle( rmContext.getDispatcher().getEventHandler().handle(

View File

@ -32,9 +32,12 @@
import org.apache.hadoop.metrics2.MetricsSource; import org.apache.hadoop.metrics2.MetricsSource;
import org.apache.hadoop.metrics2.MetricsSystem; import org.apache.hadoop.metrics2.MetricsSystem;
import org.apache.hadoop.metrics2.impl.MetricsSystemImpl; import org.apache.hadoop.metrics2.impl.MetricsSystemImpl;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.server.resourcemanager.resource.Resource; import org.apache.hadoop.yarn.server.resourcemanager.resource.Resource;
import org.apache.hadoop.yarn.server.resourcemanager.resource.Resources; import org.apache.hadoop.yarn.server.resourcemanager.resource.Resources;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptState; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptState;
import org.apache.hadoop.yarn.util.BuilderUtils;
import org.junit.Test; import org.junit.Test;
public class TestQueueMetrics { public class TestQueueMetrics {
@ -52,7 +55,7 @@ public class TestQueueMetrics {
MetricsSource queueSource= queueSource(ms, queueName); MetricsSource queueSource= queueSource(ms, queueName);
AppSchedulingInfo app = mockApp(user); AppSchedulingInfo app = mockApp(user);
metrics.submitApp(user); metrics.submitApp(user, 1);
MetricsSource userSource = userSource(ms, queueName, user); MetricsSource userSource = userSource(ms, queueName, user);
checkApps(queueSource, 1, 1, 0, 0, 0, 0); checkApps(queueSource, 1, 1, 0, 0, 0, 0);
@ -76,6 +79,53 @@ public class TestQueueMetrics {
assertNull(userSource); assertNull(userSource);
} }
@Test
public void testQueueAppMetricsForMultipleFailures() {
String queueName = "single";
String user = "alice";
QueueMetrics metrics = QueueMetrics.forQueue(ms, queueName, null, false,
new Configuration());
MetricsSource queueSource = queueSource(ms, queueName);
AppSchedulingInfo app = mockApp(user);
metrics.submitApp(user, 1);
MetricsSource userSource = userSource(ms, queueName, user);
checkApps(queueSource, 1, 1, 0, 0, 0, 0);
metrics.incrAppsRunning(app, user);
checkApps(queueSource, 1, 0, 1, 0, 0, 0);
metrics.finishApp(app, RMAppAttemptState.FAILED);
checkApps(queueSource, 1, 0, 0, 0, 1, 0);
// As the application has failed, framework retries the same application
// based on configuration
metrics.submitApp(user, 2);
checkApps(queueSource, 1, 1, 0, 0, 0, 0);
metrics.incrAppsRunning(app, user);
checkApps(queueSource, 1, 0, 1, 0, 0, 0);
// Suppose say application has failed this time as well.
metrics.finishApp(app, RMAppAttemptState.FAILED);
checkApps(queueSource, 1, 0, 0, 0, 1, 0);
// As the application has failed, framework retries the same application
// based on configuration
metrics.submitApp(user, 3);
checkApps(queueSource, 1, 1, 0, 0, 0, 0);
metrics.incrAppsRunning(app, user);
checkApps(queueSource, 1, 0, 1, 0, 0, 0);
// Suppose say application has finished.
metrics.finishApp(app, RMAppAttemptState.FINISHED);
checkApps(queueSource, 1, 0, 0, 1, 0, 0);
assertNull(userSource);
}
@Test public void testSingleQueueWithUserMetrics() { @Test public void testSingleQueueWithUserMetrics() {
String queueName = "single2"; String queueName = "single2";
String user = "dodo"; String user = "dodo";
@ -85,7 +135,7 @@ public class TestQueueMetrics {
MetricsSource queueSource = queueSource(ms, queueName); MetricsSource queueSource = queueSource(ms, queueName);
AppSchedulingInfo app = mockApp(user); AppSchedulingInfo app = mockApp(user);
metrics.submitApp(user); metrics.submitApp(user, 1);
MetricsSource userSource = userSource(ms, queueName, user); MetricsSource userSource = userSource(ms, queueName, user);
checkApps(queueSource, 1, 1, 0, 0, 0, 0); checkApps(queueSource, 1, 1, 0, 0, 0, 0);
@ -131,7 +181,7 @@ public class TestQueueMetrics {
MetricsSource queueSource = queueSource(ms, leafQueueName); MetricsSource queueSource = queueSource(ms, leafQueueName);
AppSchedulingInfo app = mockApp(user); AppSchedulingInfo app = mockApp(user);
metrics.submitApp(user); metrics.submitApp(user, 1);
MetricsSource userSource = userSource(ms, leafQueueName, user); MetricsSource userSource = userSource(ms, leafQueueName, user);
MetricsSource parentUserSource = userSource(ms, parentQueueName, user); MetricsSource parentUserSource = userSource(ms, parentQueueName, user);
@ -184,7 +234,7 @@ public static void checkApps(MetricsSource source, int submitted, int pending,
assertGauge("AppsPending", pending, rb); assertGauge("AppsPending", pending, rb);
assertGauge("AppsRunning", running, rb); assertGauge("AppsRunning", running, rb);
assertCounter("AppsCompleted", completed, rb); assertCounter("AppsCompleted", completed, rb);
assertCounter("AppsFailed", failed, rb); assertGauge("AppsFailed", failed, rb);
assertCounter("AppsKilled", killed, rb); assertCounter("AppsKilled", killed, rb);
} }
@ -207,6 +257,9 @@ public static void checkResources(MetricsSource source, int allocatedMB,
private static AppSchedulingInfo mockApp(String user) { private static AppSchedulingInfo mockApp(String user) {
AppSchedulingInfo app = mock(AppSchedulingInfo.class); AppSchedulingInfo app = mock(AppSchedulingInfo.class);
when(app.getUser()).thenReturn(user); when(app.getUser()).thenReturn(user);
ApplicationId appId = BuilderUtils.newApplicationId(1, 1);
ApplicationAttemptId id = BuilderUtils.newApplicationAttemptId(appId, 1);
when(app.getApplicationAttemptId()).thenReturn(id);
return app; return app;
} }

View File

@ -55,6 +55,7 @@
import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider; import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
import org.apache.hadoop.yarn.server.resourcemanager.RMContext; import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
import org.apache.hadoop.yarn.server.resourcemanager.resource.Resources; import org.apache.hadoop.yarn.server.resourcemanager.resource.Resources;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptState;
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer; import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerEventType; import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerEventType;
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeImpl; import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeImpl;
@ -63,6 +64,7 @@
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.QueueMetrics; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.QueueMetrics;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApp; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApp;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNode; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNode;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppRemovedSchedulerEvent;
import org.junit.After; import org.junit.After;
import org.junit.Before; import org.junit.Before;
import org.junit.Test; import org.junit.Test;
@ -89,7 +91,8 @@ public class TestLeafQueue {
@Before @Before
public void setUp() throws Exception { public void setUp() throws Exception {
cs = new CapacityScheduler(); CapacityScheduler spyCs = new CapacityScheduler();
cs = spy(spyCs);
rmContext = TestUtils.getMockRMContext(); rmContext = TestUtils.getMockRMContext();
csConf = csConf =
@ -310,6 +313,14 @@ public void testAppAttemptMetrics() throws Exception {
rmContext, null); rmContext, null);
a.submitApplication(app_0, user_0, B); a.submitApplication(app_0, user_0, B);
when(cs.getApplication(appAttemptId_0)).thenReturn(app_0);
AppRemovedSchedulerEvent event = new AppRemovedSchedulerEvent(
appAttemptId_0, RMAppAttemptState.FAILED);
cs.handle(event);
assertEquals(0, a.getMetrics().getAppsPending());
assertEquals(1, a.getMetrics().getAppsFailed());
// Attempt the same application again // Attempt the same application again
final ApplicationAttemptId appAttemptId_1 = TestUtils final ApplicationAttemptId appAttemptId_1 = TestUtils
.getMockApplicationAttemptId(0, 2); .getMockApplicationAttemptId(0, 2);
@ -320,6 +331,16 @@ public void testAppAttemptMetrics() throws Exception {
assertEquals(1, a.getMetrics().getAppsSubmitted()); assertEquals(1, a.getMetrics().getAppsSubmitted());
assertEquals(1, a.getMetrics().getAppsPending()); assertEquals(1, a.getMetrics().getAppsPending());
when(cs.getApplication(appAttemptId_1)).thenReturn(app_0);
event = new AppRemovedSchedulerEvent(appAttemptId_0,
RMAppAttemptState.FINISHED);
cs.handle(event);
assertEquals(1, a.getMetrics().getAppsSubmitted());
assertEquals(0, a.getMetrics().getAppsPending());
assertEquals(0, a.getMetrics().getAppsFailed());
assertEquals(1, a.getMetrics().getAppsCompleted());
QueueMetrics userMetrics = a.getMetrics().getUserMetrics(user_0); QueueMetrics userMetrics = a.getMetrics().getUserMetrics(user_0);
assertEquals(1, userMetrics.getAppsSubmitted()); assertEquals(1, userMetrics.getAppsSubmitted());
} }