Merge -c 1512081 from trunk to branch-2 to fix YARN-1043. Push all metrics consistently. Contributed by Jian He.

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/branch-2@1512082 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Arun Murthy 2013-08-08 22:55:33 +00:00
parent 22743ed74b
commit 25038197b5
5 changed files with 56 additions and 46 deletions

View File

@ -384,7 +384,7 @@ public class MetricsSystemImpl extends MetricsSystem implements MetricsSource {
private void snapshotMetrics(MetricsSourceAdapter sa, private void snapshotMetrics(MetricsSourceAdapter sa,
MetricsBufferBuilder bufferBuilder) { MetricsBufferBuilder bufferBuilder) {
long startTime = Time.now(); long startTime = Time.now();
bufferBuilder.add(sa.name(), sa.getMetrics(collector, false)); bufferBuilder.add(sa.name(), sa.getMetrics(collector, true));
collector.clear(); collector.clear();
snapshotStat.add(Time.now() - startTime); snapshotStat.add(Time.now() - startTime);
LOG.debug("Snapshotted source "+ sa.name()); LOG.debug("Snapshotted source "+ sa.name());

View File

@ -845,6 +845,8 @@ Release 2.1.0-beta - 2013-08-06
YARN-909. Disable TestLinuxContainerExecutorWithMocks on Windows. (Chuan Liu YARN-909. Disable TestLinuxContainerExecutorWithMocks on Windows. (Chuan Liu
via cnauroth) via cnauroth)
YARN-1043. Push all metrics consistently. (Jian He via acmurthy)
Release 2.0.5-alpha - 06/06/2013 Release 2.0.5-alpha - 06/06/2013
INCOMPATIBLE CHANGES INCOMPATIBLE CHANGES

View File

@ -126,18 +126,6 @@ public class QueueMetrics implements MetricsSource {
enableUserMetrics, conf); enableUserMetrics, conf);
} }
// this method is here because we want to make sure these metrics show up on
// queue registration.
public void initMetrics() {
appsSubmitted.incr(0);
appsRunning.incr(0);
appsPending.incr(0);
appsCompleted.incr(0);
appsKilled.incr(0);
appsFailed.incr(0);
reservedContainers.incr(0);
}
/** /**
* Helper method to clear cache - used only for unit tests. * Helper method to clear cache - used only for unit tests.
*/ */
@ -168,7 +156,6 @@ public class QueueMetrics implements MetricsSource {
ms.register( ms.register(
sourceName(queueName).toString(), sourceName(queueName).toString(),
"Metrics for queue: " + queueName, metrics); "Metrics for queue: " + queueName, metrics);
metrics.initMetrics();
} }
queueMetrics.put(queueName, metrics); queueMetrics.put(queueName, metrics);
} }

View File

@ -94,7 +94,6 @@ public class FSQueueMetrics extends QueueMetrics {
metrics = ms.register( metrics = ms.register(
sourceName(queueName).toString(), sourceName(queueName).toString(),
"Metrics for queue: " + queueName, metrics); "Metrics for queue: " + queueName, metrics);
metrics.initMetrics();
} }
queueMetrics.put(queueName, metrics); queueMetrics.put(queueName, metrics);
} }

View File

@ -68,7 +68,7 @@ public class TestQueueMetrics {
metrics.submitApp(user, 1); metrics.submitApp(user, 1);
MetricsSource userSource = userSource(ms, queueName, user); MetricsSource userSource = userSource(ms, queueName, user);
checkApps(queueSource, 1, 1, 0, 0, 0, 0); checkApps(queueSource, 1, 1, 0, 0, 0, 0, true);
metrics.setAvailableResourcesToQueue(Resources.createResource(100*GB, 100)); metrics.setAvailableResourcesToQueue(Resources.createResource(100*GB, 100));
metrics.incrPendingResources(user, 5, Resources.createResource(15*GB, 15)); metrics.incrPendingResources(user, 5, Resources.createResource(15*GB, 15));
@ -77,7 +77,7 @@ public class TestQueueMetrics {
checkResources(queueSource, 0, 0, 0, 0, 0, 100*GB, 100, 15*GB, 15, 5, 0, 0, 0); checkResources(queueSource, 0, 0, 0, 0, 0, 100*GB, 100, 15*GB, 15, 5, 0, 0, 0);
metrics.incrAppsRunning(app, user); metrics.incrAppsRunning(app, user);
checkApps(queueSource, 1, 0, 1, 0, 0, 0); checkApps(queueSource, 1, 0, 1, 0, 0, 0, true);
metrics.allocateResources(user, 3, Resources.createResource(2*GB, 2)); metrics.allocateResources(user, 3, Resources.createResource(2*GB, 2));
checkResources(queueSource, 6*GB, 6, 3, 3, 0, 100*GB, 100, 9*GB, 9, 2, 0, 0, 0); checkResources(queueSource, 6*GB, 6, 3, 3, 0, 100*GB, 100, 9*GB, 9, 2, 0, 0, 0);
@ -86,7 +86,7 @@ public class TestQueueMetrics {
checkResources(queueSource, 4*GB, 4, 2, 3, 1, 100*GB, 100, 9*GB, 9, 2, 0, 0, 0); checkResources(queueSource, 4*GB, 4, 2, 3, 1, 100*GB, 100, 9*GB, 9, 2, 0, 0, 0);
metrics.finishApp(app, RMAppAttemptState.FINISHED); metrics.finishApp(app, RMAppAttemptState.FINISHED);
checkApps(queueSource, 1, 0, 0, 1, 0, 0); checkApps(queueSource, 1, 0, 0, 1, 0, 0, true);
assertNull(userSource); assertNull(userSource);
} }
@ -102,37 +102,37 @@ public class TestQueueMetrics {
metrics.submitApp(user, 1); metrics.submitApp(user, 1);
MetricsSource userSource = userSource(ms, queueName, user); MetricsSource userSource = userSource(ms, queueName, user);
checkApps(queueSource, 1, 1, 0, 0, 0, 0); checkApps(queueSource, 1, 1, 0, 0, 0, 0, true);
metrics.incrAppsRunning(app, user); metrics.incrAppsRunning(app, user);
checkApps(queueSource, 1, 0, 1, 0, 0, 0); checkApps(queueSource, 1, 0, 1, 0, 0, 0, true);
metrics.finishApp(app, RMAppAttemptState.FAILED); metrics.finishApp(app, RMAppAttemptState.FAILED);
checkApps(queueSource, 1, 0, 0, 0, 1, 0); checkApps(queueSource, 1, 0, 0, 0, 1, 0, true);
// As the application has failed, framework retries the same application // As the application has failed, framework retries the same application
// based on configuration // based on configuration
metrics.submitApp(user, 2); metrics.submitApp(user, 2);
checkApps(queueSource, 1, 1, 0, 0, 0, 0); checkApps(queueSource, 1, 1, 0, 0, 0, 0, true);
metrics.incrAppsRunning(app, user); metrics.incrAppsRunning(app, user);
checkApps(queueSource, 1, 0, 1, 0, 0, 0); checkApps(queueSource, 1, 0, 1, 0, 0, 0, true);
// Suppose say application has failed this time as well. // Suppose say application has failed this time as well.
metrics.finishApp(app, RMAppAttemptState.FAILED); metrics.finishApp(app, RMAppAttemptState.FAILED);
checkApps(queueSource, 1, 0, 0, 0, 1, 0); checkApps(queueSource, 1, 0, 0, 0, 1, 0, true);
// As the application has failed, framework retries the same application // As the application has failed, framework retries the same application
// based on configuration // based on configuration
metrics.submitApp(user, 3); metrics.submitApp(user, 3);
checkApps(queueSource, 1, 1, 0, 0, 0, 0); checkApps(queueSource, 1, 1, 0, 0, 0, 0, true);
metrics.incrAppsRunning(app, user); metrics.incrAppsRunning(app, user);
checkApps(queueSource, 1, 0, 1, 0, 0, 0); checkApps(queueSource, 1, 0, 1, 0, 0, 0, true);
// Suppose say application has finished. // Suppose say application has finished.
metrics.finishApp(app, RMAppAttemptState.FINISHED); metrics.finishApp(app, RMAppAttemptState.FINISHED);
checkApps(queueSource, 1, 0, 0, 1, 0, 0); checkApps(queueSource, 1, 0, 0, 1, 0, 0, true);
assertNull(userSource); assertNull(userSource);
} }
@ -149,8 +149,8 @@ public class TestQueueMetrics {
metrics.submitApp(user, 1); metrics.submitApp(user, 1);
MetricsSource userSource = userSource(ms, queueName, user); MetricsSource userSource = userSource(ms, queueName, user);
checkApps(queueSource, 1, 1, 0, 0, 0, 0); checkApps(queueSource, 1, 1, 0, 0, 0, 0, true);
checkApps(userSource, 1, 1, 0, 0, 0, 0); checkApps(userSource, 1, 1, 0, 0, 0, 0, true);
metrics.setAvailableResourcesToQueue(Resources.createResource(100*GB, 100)); metrics.setAvailableResourcesToQueue(Resources.createResource(100*GB, 100));
metrics.setAvailableResourcesToUser(user, Resources.createResource(10*GB, 10)); metrics.setAvailableResourcesToUser(user, Resources.createResource(10*GB, 10));
@ -161,8 +161,8 @@ public class TestQueueMetrics {
checkResources(userSource, 0, 0, 0, 0, 0, 10*GB, 10, 15*GB, 15, 5, 0, 0, 0); checkResources(userSource, 0, 0, 0, 0, 0, 10*GB, 10, 15*GB, 15, 5, 0, 0, 0);
metrics.incrAppsRunning(app, user); metrics.incrAppsRunning(app, user);
checkApps(queueSource, 1, 0, 1, 0, 0, 0); checkApps(queueSource, 1, 0, 1, 0, 0, 0, true);
checkApps(userSource, 1, 0, 1, 0, 0, 0); checkApps(userSource, 1, 0, 1, 0, 0, 0, true);
metrics.allocateResources(user, 3, Resources.createResource(2*GB, 2)); metrics.allocateResources(user, 3, Resources.createResource(2*GB, 2));
checkResources(queueSource, 6*GB, 6, 3, 3, 0, 100*GB, 100, 9*GB, 9, 2, 0, 0, 0); checkResources(queueSource, 6*GB, 6, 3, 3, 0, 100*GB, 100, 9*GB, 9, 2, 0, 0, 0);
@ -173,8 +173,8 @@ public class TestQueueMetrics {
checkResources(userSource, 4*GB, 4, 2, 3, 1, 10*GB, 10, 9*GB, 9, 2, 0, 0, 0); checkResources(userSource, 4*GB, 4, 2, 3, 1, 10*GB, 10, 9*GB, 9, 2, 0, 0, 0);
metrics.finishApp(app, RMAppAttemptState.FINISHED); metrics.finishApp(app, RMAppAttemptState.FINISHED);
checkApps(queueSource, 1, 0, 0, 1, 0, 0); checkApps(queueSource, 1, 0, 0, 1, 0, 0, true);
checkApps(userSource, 1, 0, 0, 1, 0, 0); checkApps(userSource, 1, 0, 0, 1, 0, 0, true);
} }
@Test public void testTwoLevelWithUserMetrics() { @Test public void testTwoLevelWithUserMetrics() {
@ -196,10 +196,10 @@ public class TestQueueMetrics {
MetricsSource userSource = userSource(ms, leafQueueName, user); MetricsSource userSource = userSource(ms, leafQueueName, user);
MetricsSource parentUserSource = userSource(ms, parentQueueName, user); MetricsSource parentUserSource = userSource(ms, parentQueueName, user);
checkApps(queueSource, 1, 1, 0, 0, 0, 0); checkApps(queueSource, 1, 1, 0, 0, 0, 0, true);
checkApps(parentQueueSource, 1, 1, 0, 0, 0, 0); checkApps(parentQueueSource, 1, 1, 0, 0, 0, 0, true);
checkApps(userSource, 1, 1, 0, 0, 0, 0); checkApps(userSource, 1, 1, 0, 0, 0, 0, true);
checkApps(parentUserSource, 1, 1, 0, 0, 0, 0); checkApps(parentUserSource, 1, 1, 0, 0, 0, 0, true);
parentMetrics.setAvailableResourcesToQueue(Resources.createResource(100*GB, 100)); parentMetrics.setAvailableResourcesToQueue(Resources.createResource(100*GB, 100));
metrics.setAvailableResourcesToQueue(Resources.createResource(100*GB, 100)); metrics.setAvailableResourcesToQueue(Resources.createResource(100*GB, 100));
@ -212,8 +212,8 @@ public class TestQueueMetrics {
checkResources(parentUserSource, 0, 0, 0, 0, 0, 10*GB, 10, 15*GB, 15, 5, 0, 0, 0); checkResources(parentUserSource, 0, 0, 0, 0, 0, 10*GB, 10, 15*GB, 15, 5, 0, 0, 0);
metrics.incrAppsRunning(app, user); metrics.incrAppsRunning(app, user);
checkApps(queueSource, 1, 0, 1, 0, 0, 0); checkApps(queueSource, 1, 0, 1, 0, 0, 0, true);
checkApps(userSource, 1, 0, 1, 0, 0, 0); checkApps(userSource, 1, 0, 1, 0, 0, 0, true);
metrics.allocateResources(user, 3, Resources.createResource(2*GB, 2)); metrics.allocateResources(user, 3, Resources.createResource(2*GB, 2));
metrics.reserveResource(user, Resources.createResource(3*GB, 3)); metrics.reserveResource(user, Resources.createResource(3*GB, 3));
@ -232,10 +232,10 @@ public class TestQueueMetrics {
checkResources(parentUserSource, 4*GB, 4, 2, 3, 1, 10*GB, 10, 9*GB, 9, 2, 0, 0, 0); checkResources(parentUserSource, 4*GB, 4, 2, 3, 1, 10*GB, 10, 9*GB, 9, 2, 0, 0, 0);
metrics.finishApp(app, RMAppAttemptState.FINISHED); metrics.finishApp(app, RMAppAttemptState.FINISHED);
checkApps(queueSource, 1, 0, 0, 1, 0, 0); checkApps(queueSource, 1, 0, 0, 1, 0, 0, true);
checkApps(parentQueueSource, 1, 0, 0, 1, 0, 0); checkApps(parentQueueSource, 1, 0, 0, 1, 0, 0, true);
checkApps(userSource, 1, 0, 0, 1, 0, 0); checkApps(userSource, 1, 0, 0, 1, 0, 0, true);
checkApps(parentUserSource, 1, 0, 0, 1, 0, 0); checkApps(parentUserSource, 1, 0, 0, 1, 0, 0, true);
} }
@Test @Test
@ -275,13 +275,35 @@ public class TestQueueMetrics {
FifoScheduler.class, ResourceScheduler.class); FifoScheduler.class, ResourceScheduler.class);
MockRM rm = new MockRM(conf); MockRM rm = new MockRM(conf);
QueueMetrics metrics = rm.getResourceScheduler().getRootQueueMetrics(); QueueMetrics metrics = rm.getResourceScheduler().getRootQueueMetrics();
checkApps(metrics, 0, 0, 0, 0, 0, 0); checkApps(metrics, 0, 0, 0, 0, 0, 0, true);
MetricsAsserts.assertGauge("ReservedContainers", 0, metrics); MetricsAsserts.assertGauge("ReservedContainers", 0, metrics);
} }
// This is to test all metrics can consistently show up if specified true to
// collect all metrics, even though they are not modified from last time they
// are collected. If not collecting all metrics, only modified metrics will show up.
@Test
public void testCollectAllMetrics() {
String queueName = "single";
QueueMetrics.forQueue(ms, queueName, null, false, conf);
MetricsSource queueSource = queueSource(ms, queueName);
checkApps(queueSource, 0, 0, 0, 0, 0, 0, true);
try {
// do not collect all metrics
checkApps(queueSource, 0, 0, 0, 0, 0, 0, false);
Assert.fail();
} catch (AssertionError e) {
Assert.assertTrue(e.getMessage().contains(
"Expected exactly one metric for name "));
}
// collect all metrics
checkApps(queueSource, 0, 0, 0, 0, 0, 0, true);
}
public static void checkApps(MetricsSource source, int submitted, int pending, public static void checkApps(MetricsSource source, int submitted, int pending,
int running, int completed, int failed, int killed) { int running, int completed, int failed, int killed, boolean all) {
MetricsRecordBuilder rb = getMetrics(source); MetricsRecordBuilder rb = getMetrics(source, all);
assertCounter("AppsSubmitted", submitted, rb); assertCounter("AppsSubmitted", submitted, rb);
assertGauge("AppsPending", pending, rb); assertGauge("AppsPending", pending, rb);
assertGauge("AppsRunning", running, rb); assertGauge("AppsRunning", running, rb);