MAPREDUCE-3708. Metrics: Incorrect Apps Submitted Count (Bhallamudi via mahadev) - Merging r1239954 from trunk.
git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/branch-0.23@1239957 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
3d5c443a7d
commit
a85928e7b5
|
@ -618,6 +618,9 @@ Release 0.23.1 - Unreleased
|
||||||
MAPREDUCE-3780. Fixed a bug where applications killed before getting
|
MAPREDUCE-3780. Fixed a bug where applications killed before getting
|
||||||
activated were not getting cleaned up properly. (Hitesh Shah via acmurthy)
|
activated were not getting cleaned up properly. (Hitesh Shah via acmurthy)
|
||||||
|
|
||||||
|
MAPREDUCE-3708. Metrics: Incorrect Apps Submitted Count (Bhallamudi via
|
||||||
|
mahadev)
|
||||||
|
|
||||||
Release 0.23.0 - 2011-11-01
|
Release 0.23.0 - 2011-11-01
|
||||||
|
|
||||||
INCOMPATIBLE CHANGES
|
INCOMPATIBLE CHANGES
|
||||||
|
|
|
@ -606,7 +606,10 @@ public class LeafQueue implements CSQueue {
|
||||||
addApplication(application, user);
|
addApplication(application, user);
|
||||||
}
|
}
|
||||||
|
|
||||||
metrics.submitApp(userName);
|
int attemptId = application.getApplicationAttemptId().getAttemptId();
|
||||||
|
if (attemptId == 1) {
|
||||||
|
metrics.submitApp(userName);
|
||||||
|
}
|
||||||
|
|
||||||
// Inform the parent queue
|
// Inform the parent queue
|
||||||
try {
|
try {
|
||||||
|
|
|
@ -298,7 +298,9 @@ public class FifoScheduler implements ResourceScheduler {
|
||||||
new SchedulerApp(appAttemptId, user, DEFAULT_QUEUE, activeUsersManager,
|
new SchedulerApp(appAttemptId, user, DEFAULT_QUEUE, activeUsersManager,
|
||||||
this.rmContext, null);
|
this.rmContext, null);
|
||||||
applications.put(appAttemptId, schedulerApp);
|
applications.put(appAttemptId, schedulerApp);
|
||||||
metrics.submitApp(user);
|
if (appAttemptId.getAttemptId() == 1) {
|
||||||
|
metrics.submitApp(user);
|
||||||
|
}
|
||||||
LOG.info("Application Submission: " + appAttemptId.getApplicationId() +
|
LOG.info("Application Submission: " + appAttemptId.getApplicationId() +
|
||||||
" from " + user + ", currently active: " + applications.size());
|
" from " + user + ", currently active: " + applications.size());
|
||||||
rmContext.getDispatcher().getEventHandler().handle(
|
rmContext.getDispatcher().getEventHandler().handle(
|
||||||
|
|
|
@ -59,6 +59,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerEven
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeImpl;
|
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeImpl;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ActiveUsersManager;
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ActiveUsersManager;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.NodeType;
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.NodeType;
|
||||||
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.QueueMetrics;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApp;
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApp;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNode;
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNode;
|
||||||
import org.junit.After;
|
import org.junit.After;
|
||||||
|
@ -92,6 +93,7 @@ public class TestLeafQueue {
|
||||||
|
|
||||||
csConf =
|
csConf =
|
||||||
new CapacitySchedulerConfiguration();
|
new CapacitySchedulerConfiguration();
|
||||||
|
csConf.setBoolean("yarn.scheduler.capacity.user-metrics.enable", true);
|
||||||
setupQueueConfiguration(csConf);
|
setupQueueConfiguration(csConf);
|
||||||
|
|
||||||
|
|
||||||
|
@ -258,6 +260,35 @@ public class TestLeafQueue {
|
||||||
assertEquals(7*GB, a.getMetrics().getAvailableMB());
|
assertEquals(7*GB, a.getMetrics().getAvailableMB());
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testAppAttemptMetrics() throws Exception {
|
||||||
|
|
||||||
|
// Manipulate queue 'a'
|
||||||
|
LeafQueue a = stubLeafQueue((LeafQueue) queues.get(B));
|
||||||
|
|
||||||
|
// Users
|
||||||
|
final String user_0 = "user_0";
|
||||||
|
|
||||||
|
// Submit applications
|
||||||
|
final ApplicationAttemptId appAttemptId_0 = TestUtils
|
||||||
|
.getMockApplicationAttemptId(0, 1);
|
||||||
|
SchedulerApp app_0 = new SchedulerApp(appAttemptId_0, user_0, a, null,
|
||||||
|
rmContext, null);
|
||||||
|
a.submitApplication(app_0, user_0, B);
|
||||||
|
|
||||||
|
// Attempt the same application again
|
||||||
|
final ApplicationAttemptId appAttemptId_1 = TestUtils
|
||||||
|
.getMockApplicationAttemptId(0, 2);
|
||||||
|
SchedulerApp app_1 = new SchedulerApp(appAttemptId_1, user_0, a, null,
|
||||||
|
rmContext, null);
|
||||||
|
a.submitApplication(app_1, user_0, B); // same user
|
||||||
|
|
||||||
|
assertEquals(1, a.getMetrics().getAppsSubmitted());
|
||||||
|
assertEquals(1, a.getMetrics().getAppsPending());
|
||||||
|
|
||||||
|
QueueMetrics userMetrics = a.getMetrics().getUserMetrics(user_0);
|
||||||
|
assertEquals(1, userMetrics.getAppsSubmitted());
|
||||||
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void testSingleQueueWithOneUser() throws Exception {
|
public void testSingleQueueWithOneUser() throws Exception {
|
||||||
|
|
|
@ -26,17 +26,28 @@ import org.apache.commons.logging.Log;
|
||||||
import org.apache.commons.logging.LogFactory;
|
import org.apache.commons.logging.LogFactory;
|
||||||
import org.apache.hadoop.conf.Configuration;
|
import org.apache.hadoop.conf.Configuration;
|
||||||
import org.apache.hadoop.net.NetworkTopology;
|
import org.apache.hadoop.net.NetworkTopology;
|
||||||
|
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
|
||||||
|
import org.apache.hadoop.yarn.api.records.ApplicationId;
|
||||||
import org.apache.hadoop.yarn.api.records.Priority;
|
import org.apache.hadoop.yarn.api.records.Priority;
|
||||||
import org.apache.hadoop.yarn.api.records.Resource;
|
import org.apache.hadoop.yarn.api.records.Resource;
|
||||||
|
import org.apache.hadoop.yarn.event.AsyncDispatcher;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.Application;
|
import org.apache.hadoop.yarn.server.resourcemanager.Application;
|
||||||
|
import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
|
||||||
|
import org.apache.hadoop.yarn.server.resourcemanager.RMContextImpl;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager;
|
import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.Task;
|
import org.apache.hadoop.yarn.server.resourcemanager.Task;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.recovery.Store;
|
import org.apache.hadoop.yarn.server.resourcemanager.recovery.Store;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.recovery.StoreFactory;
|
import org.apache.hadoop.yarn.server.resourcemanager.recovery.StoreFactory;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.resource.Resources;
|
import org.apache.hadoop.yarn.server.resourcemanager.resource.Resources;
|
||||||
|
import org.apache.hadoop.yarn.server.resourcemanager.resourcetracker.InlineDispatcher;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
|
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
|
||||||
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.QueueMetrics;
|
||||||
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppAddedSchedulerEvent;
|
||||||
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.SchedulerEvent;
|
||||||
|
import org.apache.hadoop.yarn.util.BuilderUtils;
|
||||||
import org.junit.After;
|
import org.junit.After;
|
||||||
import org.junit.Before;
|
import org.junit.Before;
|
||||||
|
import org.junit.Test;
|
||||||
|
|
||||||
public class TestFifoScheduler {
|
public class TestFifoScheduler {
|
||||||
private static final Log LOG = LogFactory.getLog(TestFifoScheduler.class);
|
private static final Log LOG = LogFactory.getLog(TestFifoScheduler.class);
|
||||||
|
@ -63,7 +74,30 @@ public class TestFifoScheduler {
|
||||||
.getRMContext());
|
.getRMContext());
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testAppAttemptMetrics() throws Exception {
|
||||||
|
AsyncDispatcher dispatcher = new InlineDispatcher();
|
||||||
|
RMContext rmContext = new RMContextImpl(null, dispatcher, null, null, null);
|
||||||
|
|
||||||
|
FifoScheduler schedular = new FifoScheduler();
|
||||||
|
schedular.reinitialize(new Configuration(), null, rmContext);
|
||||||
|
|
||||||
|
ApplicationId appId = BuilderUtils.newApplicationId(200, 1);
|
||||||
|
ApplicationAttemptId appAttemptId = BuilderUtils.newApplicationAttemptId(
|
||||||
|
appId, 1);
|
||||||
|
|
||||||
|
SchedulerEvent event = new AppAddedSchedulerEvent(appAttemptId, "queue",
|
||||||
|
"user");
|
||||||
|
schedular.handle(event);
|
||||||
|
|
||||||
|
appAttemptId = BuilderUtils.newApplicationAttemptId(appId, 2);
|
||||||
|
|
||||||
|
event = new AppAddedSchedulerEvent(appAttemptId, "queue", "user");
|
||||||
|
schedular.handle(event);
|
||||||
|
|
||||||
|
QueueMetrics metrics = schedular.getRootQueueMetrics();
|
||||||
|
Assert.assertEquals(1, metrics.getAppsSubmitted());
|
||||||
|
}
|
||||||
|
|
||||||
// @Test
|
// @Test
|
||||||
public void testFifoScheduler() throws Exception {
|
public void testFifoScheduler() throws Exception {
|
||||||
|
|
Loading…
Reference in New Issue