YARN-1043. Push all metrics consistently. Contributed by Jian He.

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1512081 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Arun Murthy 2013-08-08 22:54:31 +00:00
parent 81f9786ae6
commit e90afcc971
5 changed files with 56 additions and 46 deletions

View File

@ -381,7 +381,7 @@ public class MetricsSystemImpl extends MetricsSystem implements MetricsSource {
private void snapshotMetrics(MetricsSourceAdapter sa,
MetricsBufferBuilder bufferBuilder) {
long startTime = Time.now();
bufferBuilder.add(sa.name(), sa.getMetrics(collector, false));
bufferBuilder.add(sa.name(), sa.getMetrics(collector, true));
collector.clear();
snapshotStat.add(Time.now() - startTime);
LOG.debug("Snapshotted source "+ sa.name());

View File

@ -860,6 +860,8 @@ Release 2.1.0-beta - 2013-08-06
YARN-909. Disable TestLinuxContainerExecutorWithMocks on Windows. (Chuan Liu
via cnauroth)
YARN-1043. Push all metrics consistently. (Jian He via acmurthy)
Release 2.0.5-alpha - 06/06/2013
INCOMPATIBLE CHANGES

View File

@ -126,18 +126,6 @@ public class QueueMetrics implements MetricsSource {
enableUserMetrics, conf);
}
// this method is here because we want to make sure these metrics show up on
// queue registration.
public void initMetrics() {
appsSubmitted.incr(0);
appsRunning.incr(0);
appsPending.incr(0);
appsCompleted.incr(0);
appsKilled.incr(0);
appsFailed.incr(0);
reservedContainers.incr(0);
}
/**
* Helper method to clear cache - used only for unit tests.
*/
@ -168,7 +156,6 @@ public class QueueMetrics implements MetricsSource {
ms.register(
sourceName(queueName).toString(),
"Metrics for queue: " + queueName, metrics);
metrics.initMetrics();
}
queueMetrics.put(queueName, metrics);
}

View File

@ -94,7 +94,6 @@ public class FSQueueMetrics extends QueueMetrics {
metrics = ms.register(
sourceName(queueName).toString(),
"Metrics for queue: " + queueName, metrics);
metrics.initMetrics();
}
queueMetrics.put(queueName, metrics);
}

View File

@ -68,7 +68,7 @@ public class TestQueueMetrics {
metrics.submitApp(user, 1);
MetricsSource userSource = userSource(ms, queueName, user);
checkApps(queueSource, 1, 1, 0, 0, 0, 0);
checkApps(queueSource, 1, 1, 0, 0, 0, 0, true);
metrics.setAvailableResourcesToQueue(Resources.createResource(100*GB, 100));
metrics.incrPendingResources(user, 5, Resources.createResource(15*GB, 15));
@ -77,7 +77,7 @@ public class TestQueueMetrics {
checkResources(queueSource, 0, 0, 0, 0, 0, 100*GB, 100, 15*GB, 15, 5, 0, 0, 0);
metrics.incrAppsRunning(app, user);
checkApps(queueSource, 1, 0, 1, 0, 0, 0);
checkApps(queueSource, 1, 0, 1, 0, 0, 0, true);
metrics.allocateResources(user, 3, Resources.createResource(2*GB, 2));
checkResources(queueSource, 6*GB, 6, 3, 3, 0, 100*GB, 100, 9*GB, 9, 2, 0, 0, 0);
@ -86,7 +86,7 @@ public class TestQueueMetrics {
checkResources(queueSource, 4*GB, 4, 2, 3, 1, 100*GB, 100, 9*GB, 9, 2, 0, 0, 0);
metrics.finishApp(app, RMAppAttemptState.FINISHED);
checkApps(queueSource, 1, 0, 0, 1, 0, 0);
checkApps(queueSource, 1, 0, 0, 1, 0, 0, true);
assertNull(userSource);
}
@ -102,37 +102,37 @@ public class TestQueueMetrics {
metrics.submitApp(user, 1);
MetricsSource userSource = userSource(ms, queueName, user);
checkApps(queueSource, 1, 1, 0, 0, 0, 0);
checkApps(queueSource, 1, 1, 0, 0, 0, 0, true);
metrics.incrAppsRunning(app, user);
checkApps(queueSource, 1, 0, 1, 0, 0, 0);
checkApps(queueSource, 1, 0, 1, 0, 0, 0, true);
metrics.finishApp(app, RMAppAttemptState.FAILED);
checkApps(queueSource, 1, 0, 0, 0, 1, 0);
checkApps(queueSource, 1, 0, 0, 0, 1, 0, true);
// As the application has failed, framework retries the same application
// based on configuration
metrics.submitApp(user, 2);
checkApps(queueSource, 1, 1, 0, 0, 0, 0);
checkApps(queueSource, 1, 1, 0, 0, 0, 0, true);
metrics.incrAppsRunning(app, user);
checkApps(queueSource, 1, 0, 1, 0, 0, 0);
checkApps(queueSource, 1, 0, 1, 0, 0, 0, true);
// Suppose say application has failed this time as well.
metrics.finishApp(app, RMAppAttemptState.FAILED);
checkApps(queueSource, 1, 0, 0, 0, 1, 0);
checkApps(queueSource, 1, 0, 0, 0, 1, 0, true);
// As the application has failed, framework retries the same application
// based on configuration
metrics.submitApp(user, 3);
checkApps(queueSource, 1, 1, 0, 0, 0, 0);
checkApps(queueSource, 1, 1, 0, 0, 0, 0, true);
metrics.incrAppsRunning(app, user);
checkApps(queueSource, 1, 0, 1, 0, 0, 0);
checkApps(queueSource, 1, 0, 1, 0, 0, 0, true);
// Suppose say application has finished.
metrics.finishApp(app, RMAppAttemptState.FINISHED);
checkApps(queueSource, 1, 0, 0, 1, 0, 0);
checkApps(queueSource, 1, 0, 0, 1, 0, 0, true);
assertNull(userSource);
}
@ -149,8 +149,8 @@ public class TestQueueMetrics {
metrics.submitApp(user, 1);
MetricsSource userSource = userSource(ms, queueName, user);
checkApps(queueSource, 1, 1, 0, 0, 0, 0);
checkApps(userSource, 1, 1, 0, 0, 0, 0);
checkApps(queueSource, 1, 1, 0, 0, 0, 0, true);
checkApps(userSource, 1, 1, 0, 0, 0, 0, true);
metrics.setAvailableResourcesToQueue(Resources.createResource(100*GB, 100));
metrics.setAvailableResourcesToUser(user, Resources.createResource(10*GB, 10));
@ -161,8 +161,8 @@ public class TestQueueMetrics {
checkResources(userSource, 0, 0, 0, 0, 0, 10*GB, 10, 15*GB, 15, 5, 0, 0, 0);
metrics.incrAppsRunning(app, user);
checkApps(queueSource, 1, 0, 1, 0, 0, 0);
checkApps(userSource, 1, 0, 1, 0, 0, 0);
checkApps(queueSource, 1, 0, 1, 0, 0, 0, true);
checkApps(userSource, 1, 0, 1, 0, 0, 0, true);
metrics.allocateResources(user, 3, Resources.createResource(2*GB, 2));
checkResources(queueSource, 6*GB, 6, 3, 3, 0, 100*GB, 100, 9*GB, 9, 2, 0, 0, 0);
@ -173,8 +173,8 @@ public class TestQueueMetrics {
checkResources(userSource, 4*GB, 4, 2, 3, 1, 10*GB, 10, 9*GB, 9, 2, 0, 0, 0);
metrics.finishApp(app, RMAppAttemptState.FINISHED);
checkApps(queueSource, 1, 0, 0, 1, 0, 0);
checkApps(userSource, 1, 0, 0, 1, 0, 0);
checkApps(queueSource, 1, 0, 0, 1, 0, 0, true);
checkApps(userSource, 1, 0, 0, 1, 0, 0, true);
}
@Test public void testTwoLevelWithUserMetrics() {
@ -196,10 +196,10 @@ public class TestQueueMetrics {
MetricsSource userSource = userSource(ms, leafQueueName, user);
MetricsSource parentUserSource = userSource(ms, parentQueueName, user);
checkApps(queueSource, 1, 1, 0, 0, 0, 0);
checkApps(parentQueueSource, 1, 1, 0, 0, 0, 0);
checkApps(userSource, 1, 1, 0, 0, 0, 0);
checkApps(parentUserSource, 1, 1, 0, 0, 0, 0);
checkApps(queueSource, 1, 1, 0, 0, 0, 0, true);
checkApps(parentQueueSource, 1, 1, 0, 0, 0, 0, true);
checkApps(userSource, 1, 1, 0, 0, 0, 0, true);
checkApps(parentUserSource, 1, 1, 0, 0, 0, 0, true);
parentMetrics.setAvailableResourcesToQueue(Resources.createResource(100*GB, 100));
metrics.setAvailableResourcesToQueue(Resources.createResource(100*GB, 100));
@ -212,8 +212,8 @@ public class TestQueueMetrics {
checkResources(parentUserSource, 0, 0, 0, 0, 0, 10*GB, 10, 15*GB, 15, 5, 0, 0, 0);
metrics.incrAppsRunning(app, user);
checkApps(queueSource, 1, 0, 1, 0, 0, 0);
checkApps(userSource, 1, 0, 1, 0, 0, 0);
checkApps(queueSource, 1, 0, 1, 0, 0, 0, true);
checkApps(userSource, 1, 0, 1, 0, 0, 0, true);
metrics.allocateResources(user, 3, Resources.createResource(2*GB, 2));
metrics.reserveResource(user, Resources.createResource(3*GB, 3));
@ -232,10 +232,10 @@ public class TestQueueMetrics {
checkResources(parentUserSource, 4*GB, 4, 2, 3, 1, 10*GB, 10, 9*GB, 9, 2, 0, 0, 0);
metrics.finishApp(app, RMAppAttemptState.FINISHED);
checkApps(queueSource, 1, 0, 0, 1, 0, 0);
checkApps(parentQueueSource, 1, 0, 0, 1, 0, 0);
checkApps(userSource, 1, 0, 0, 1, 0, 0);
checkApps(parentUserSource, 1, 0, 0, 1, 0, 0);
checkApps(queueSource, 1, 0, 0, 1, 0, 0, true);
checkApps(parentQueueSource, 1, 0, 0, 1, 0, 0, true);
checkApps(userSource, 1, 0, 0, 1, 0, 0, true);
checkApps(parentUserSource, 1, 0, 0, 1, 0, 0, true);
}
@Test
@ -275,13 +275,35 @@ public class TestQueueMetrics {
FifoScheduler.class, ResourceScheduler.class);
MockRM rm = new MockRM(conf);
QueueMetrics metrics = rm.getResourceScheduler().getRootQueueMetrics();
checkApps(metrics, 0, 0, 0, 0, 0, 0);
checkApps(metrics, 0, 0, 0, 0, 0, 0, true);
MetricsAsserts.assertGauge("ReservedContainers", 0, metrics);
}
// This is to test all metrics can consistently show up if specified true to
// collect all metrics, even though they are not modified from last time they
// are collected. If not collecting all metrics, only modified metrics will show up.
@Test
public void testCollectAllMetrics() {
String queueName = "single";
QueueMetrics.forQueue(ms, queueName, null, false, conf);
MetricsSource queueSource = queueSource(ms, queueName);
checkApps(queueSource, 0, 0, 0, 0, 0, 0, true);
try {
// do not collect all metrics
checkApps(queueSource, 0, 0, 0, 0, 0, 0, false);
Assert.fail();
} catch (AssertionError e) {
Assert.assertTrue(e.getMessage().contains(
"Expected exactly one metric for name "));
}
// collect all metrics
checkApps(queueSource, 0, 0, 0, 0, 0, 0, true);
}
public static void checkApps(MetricsSource source, int submitted, int pending,
int running, int completed, int failed, int killed) {
MetricsRecordBuilder rb = getMetrics(source);
int running, int completed, int failed, int killed, boolean all) {
MetricsRecordBuilder rb = getMetrics(source, all);
assertCounter("AppsSubmitted", submitted, rb);
assertGauge("AppsPending", pending, rb);
assertGauge("AppsRunning", running, rb);