YARN-3273. Improve scheduler UI to facilitate scheduling analysis and debugging. Contributed Rohith Sharmaks

This commit is contained in:
Jian He 2015-03-17 21:28:58 -07:00
parent 3bc72cc16d
commit 658097d6da
21 changed files with 371 additions and 62 deletions

View File

@ -59,6 +59,9 @@ Release 2.8.0 - UNRELEASED
YARN-3243. CapacityScheduler should pass headroom from parent to children
to make sure ParentQueue obey its capacity limits. (Wangda Tan via jianhe)
YARN-3273. Improve scheduler UI to facilitate scheduling analysis and
debugging. (Rohith Sharmaks via jianhe)
OPTIMIZATIONS
YARN-3339. TestDockerContainerExecutor should pull a single image and not

View File

@ -172,7 +172,7 @@ public Collection<ContainerReport> run() throws Exception {
._("Diagnostics Info:", appAttempt.getDiagnosticsInfo() == null ?
"" : appAttempt.getDiagnosticsInfo());
html._(InfoBlock.class);
if (exceptionWhenGetContainerReports) {
html
@ -183,6 +183,19 @@ public Collection<ContainerReport> run() throws Exception {
return;
}
// TODO need to render applicationHeadRoom value from
// ApplicationAttemptMetrics after YARN-3284
if (webUiType.equals(YarnWebParams.RM_WEB_UI)) {
if (!isApplicationInFinalState(appAttempt.getAppAttemptState())) {
DIV<Hamlet> pdiv = html._(InfoBlock.class).div(_INFO_WRAP);
info("Application Attempt Overview").clear();
info("Application Attempt Metrics")._(
"Application Attempt Headroom : ", 0);
pdiv._();
}
}
html._(InfoBlock.class);
// Container Table
TBODY<TABLE<Hamlet>> tbody =
html.table("#containers").thead().tr().th(".id", "Container ID")
@ -273,4 +286,10 @@ private boolean hasAMContainer(ContainerId containerId,
}
return false;
}
private boolean isApplicationInFinalState(YarnApplicationAttemptState state) {
return state == YarnApplicationAttemptState.FINISHED
|| state == YarnApplicationAttemptState.FAILED
|| state == YarnApplicationAttemptState.KILLED;
}
}

View File

@ -41,6 +41,8 @@ public class RMAppAttemptMetrics {
private ApplicationAttemptId attemptId = null;
// preemption info
private Resource resourcePreempted = Resource.newInstance(0, 0);
// application headroom
private volatile Resource applicationHeadroom = Resource.newInstance(0, 0);
private AtomicInteger numNonAMContainersPreempted = new AtomicInteger(0);
private AtomicBoolean isPreempted = new AtomicBoolean(false);
@ -145,4 +147,12 @@ public int[][] getLocalityStatistics() {
public int getTotalAllocatedContainers() {
return this.totalAllocatedContainers;
}
public Resource getApplicationAttemptHeadroom() {
return applicationHeadroom;
}
public void setApplicationAttemptHeadRoom(Resource headRoom) {
this.applicationHeadroom = headRoom;
}
}

View File

@ -632,4 +632,14 @@ public void incNumAllocatedContainers(NodeType containerType,
requestType);
}
}
public void setApplicationHeadroomForMetrics(Resource headroom) {
RMAppAttempt attempt =
rmContext.getRMApps().get(attemptId.getApplicationId())
.getCurrentAppAttempt();
if (attempt != null) {
attempt.getRMAppAttemptMetrics().setApplicationAttemptHeadRoom(
Resources.clone(headroom));
}
}
}

View File

@ -419,10 +419,13 @@ public synchronized User getUser(String userName) {
*/
public synchronized ArrayList<UserInfo> getUsers() {
ArrayList<UserInfo> usersToReturn = new ArrayList<UserInfo>();
for (Map.Entry<String, User> entry: users.entrySet()) {
usersToReturn.add(new UserInfo(entry.getKey(), Resources.clone(
entry.getValue().getUsed()), entry.getValue().getActiveApplications(),
entry.getValue().getPendingApplications()));
for (Map.Entry<String, User> entry : users.entrySet()) {
User user = entry.getValue();
usersToReturn.add(new UserInfo(entry.getKey(), Resources.clone(user
.getUsed()), user.getActiveApplications(), user
.getPendingApplications(), Resources.clone(user
.getConsumedAMResources()), Resources.clone(user
.getUserResourceLimit())));
}
return usersToReturn;
}
@ -1068,7 +1071,7 @@ private Resource computeUserLimit(FiCaSchedulerApp application,
" clusterCapacity: " + clusterResource
);
}
user.setUserResourceLimit(limit);
return limit;
}
@ -1738,6 +1741,7 @@ resourceCalculator, this, getParent(), clusterResource,
@VisibleForTesting
public static class User {
ResourceUsage userResourceUsage = new ResourceUsage();
volatile Resource userResourceLimit = Resource.newInstance(0, 0);
int pendingApplications = 0;
int activeApplications = 0;
@ -1807,6 +1811,14 @@ public void releaseContainer(Resource resource, Set<String> nodeLabels) {
}
}
}
public Resource getUserResourceLimit() {
return userResourceLimit;
}
public void setUserResourceLimit(Resource userResourceLimit) {
this.userResourceLimit = userResourceLimit;
}
}
@Override

View File

@ -32,14 +32,19 @@ public class UserInfo {
protected ResourceInfo resourcesUsed;
protected int numPendingApplications;
protected int numActiveApplications;
protected ResourceInfo AMResourceUsed;
protected ResourceInfo userResourceLimit;
UserInfo() {}
UserInfo(String username, Resource resUsed, int activeApps, int pendingApps) {
UserInfo(String username, Resource resUsed, int activeApps, int pendingApps,
Resource amResUsed, Resource resourceLimit) {
this.username = username;
this.resourcesUsed = new ResourceInfo(resUsed);
this.numActiveApplications = activeApps;
this.numPendingApplications = pendingApps;
this.AMResourceUsed = new ResourceInfo(amResUsed);
this.userResourceLimit = new ResourceInfo(resourceLimit);
}
public String getUsername() {
@ -57,4 +62,12 @@ public int getNumPendingApplications() {
public int getNumActiveApplications() {
return numActiveApplications;
}
public ResourceInfo getAMResourcesUsed() {
return AMResourceUsed;
}
public ResourceInfo getUserResourceLimit() {
return userResourceLimit;
}
}

View File

@ -268,7 +268,9 @@ public synchronized Allocation getAllocation(ResourceCalculator rc,
minimumAllocation, numCont);
ContainersAndNMTokensAllocation allocation =
pullNewlyAllocatedContainersAndNMTokens();
return new Allocation(allocation.getContainerList(), getHeadroom(), null,
Resource headroom = getHeadroom();
setApplicationHeadroomForMetrics(headroom);
return new Allocation(allocation.getContainerList(), headroom, null,
currentContPreemption, Collections.singletonList(rr),
allocation.getNMTokenList());
}

View File

@ -938,9 +938,10 @@ clusterResource, minimumAllocation, getMaximumResourceCapability(),
application.updateBlacklist(blacklistAdditions, blacklistRemovals);
ContainersAndNMTokensAllocation allocation =
application.pullNewlyAllocatedContainersAndNMTokens();
return new Allocation(allocation.getContainerList(),
application.getHeadroom(), preemptionContainerIds, null, null,
allocation.getNMTokenList());
Resource headroom = application.getHeadroom();
application.setApplicationHeadroomForMetrics(headroom);
return new Allocation(allocation.getContainerList(), headroom,
preemptionContainerIds, null, null, allocation.getNMTokenList());
}
}

View File

@ -343,9 +343,10 @@ public Allocation allocate(
application.updateBlacklist(blacklistAdditions, blacklistRemovals);
ContainersAndNMTokensAllocation allocation =
application.pullNewlyAllocatedContainersAndNMTokens();
return new Allocation(allocation.getContainerList(),
application.getHeadroom(), null, null, null,
allocation.getNMTokenList());
Resource headroom = application.getHeadroom();
application.setApplicationHeadroomForMetrics(headroom);
return new Allocation(allocation.getContainerList(), headroom, null,
null, null, allocation.getNMTokenList());
}
}

View File

@ -37,6 +37,8 @@
import org.apache.hadoop.yarn.webapp.hamlet.Hamlet;
import org.apache.hadoop.yarn.webapp.hamlet.Hamlet.DIV;
import org.apache.hadoop.yarn.webapp.hamlet.Hamlet.LI;
import org.apache.hadoop.yarn.webapp.hamlet.Hamlet.TABLE;
import org.apache.hadoop.yarn.webapp.hamlet.Hamlet.TBODY;
import org.apache.hadoop.yarn.webapp.hamlet.Hamlet.UL;
import org.apache.hadoop.yarn.webapp.view.HtmlBlock;
import org.apache.hadoop.yarn.webapp.view.InfoBlock;
@ -67,41 +69,8 @@ static class LeafQueueInfoBlock extends HtmlBlock {
lqinfo = (CapacitySchedulerLeafQueueInfo) info.qinfo;
}
//Return a string describing one resource as a percentage of another
private String getPercentage(ResourceInfo numerator, ResourceInfo denominator) {
StringBuilder percentString = new StringBuilder("Memory: ");
if (numerator != null) {
percentString.append(numerator.getMemory());
}
if (denominator.getMemory() != 0) {
percentString.append(" (<span title='of used resources in this queue'>")
.append(StringUtils.format("%.2f", numerator.getMemory() * 100.0 /
denominator.getMemory()) + "%</span>)");
}
percentString.append(", vCores: ");
if (numerator != null) {
percentString.append(numerator.getvCores());
}
if (denominator.getvCores() != 0) {
percentString.append(" (<span title='of used resources in this queue'>")
.append(StringUtils.format("%.2f", numerator.getvCores() * 100.0 /
denominator.getvCores()) + "%</span>)");
}
return percentString.toString();
}
@Override
protected void render(Block html) {
StringBuilder activeUserList = new StringBuilder("");
ResourceInfo usedResources = lqinfo.getResourcesUsed();
ArrayList<UserInfo> users = lqinfo.getUsers().getUsersList();
for (UserInfo entry: users) {
activeUserList.append(entry.getUsername()).append(" &lt;")
.append(getPercentage(entry.getResourcesUsed(), usedResources))
.append(", Schedulable Apps: " + entry.getNumActiveApplications())
.append(", Non-Schedulable Apps: " + entry.getNumPendingApplications())
.append("&gt;<br style='display:block'>"); //Force line break
}
ResponseInfo ri = info("\'" + lqinfo.getQueuePath().substring(5) + "\' Queue Status").
_("Queue State:", lqinfo.getQueueState()).
@ -116,12 +85,12 @@ protected void render(Block html) {
_("Max Applications:", Integer.toString(lqinfo.getMaxApplications())).
_("Max Applications Per User:", Integer.toString(lqinfo.getMaxApplicationsPerUser())).
_("Max Application Master Resources:", lqinfo.getAMResourceLimit().toString()).
_("Used Application Master Resources:", lqinfo.getUsedAMResource().toString()).
_("Max Application Master Resources Per User:", lqinfo.getUserAMResourceLimit().toString()).
_("Configured Capacity:", percent(lqinfo.getCapacity() / 100)).
_("Configured Max Capacity:", percent(lqinfo.getMaxCapacity() / 100)).
_("Configured Minimum User Limit Percent:", Integer.toString(lqinfo.getUserLimit()) + "%").
_("Configured User Limit Factor:", String.format("%.1f", lqinfo.getUserLimitFactor())).
_r("Active Users: ", activeUserList.toString()).
_("Accessible Node Labels:", StringUtils.join(",", lqinfo.getNodeLabels())).
_("Preemption:", lqinfo.getPreemptionDisabled() ? "disabled" : "enabled");
@ -132,6 +101,44 @@ protected void render(Block html) {
}
}
static class QueueUsersInfoBlock extends HtmlBlock {
final CapacitySchedulerLeafQueueInfo lqinfo;
@Inject
QueueUsersInfoBlock(ViewContext ctx, CSQInfo info) {
super(ctx);
lqinfo = (CapacitySchedulerLeafQueueInfo) info.qinfo;
}
@Override
protected void render(Block html) {
TBODY<TABLE<Hamlet>> tbody =
html.table("#userinfo").thead().$class("ui-widget-header").tr().th()
.$class("ui-state-default")._("User Name")._().th()
.$class("ui-state-default")._("Max Resource")._().th()
.$class("ui-state-default")._("Used Resource")._().th()
.$class("ui-state-default")._("Max AM Resource")._().th()
.$class("ui-state-default")._("Used AM Resource")._().th()
.$class("ui-state-default")._("Schedulable Apps")._().th()
.$class("ui-state-default")._("Non-Schedulable Apps")._()._()._()
.tbody();
ArrayList<UserInfo> users = lqinfo.getUsers().getUsersList();
for (UserInfo userInfo : users) {
tbody.tr().td(userInfo.getUsername())
.td(userInfo.getUserResourceLimit().toString())
.td(userInfo.getResourcesUsed().toString())
.td(lqinfo.getUserAMResourceLimit().toString())
.td(userInfo.getAMResourcesUsed().toString())
.td(Integer.toString(userInfo.getNumActiveApplications()))
.td(Integer.toString(userInfo.getNumPendingApplications()))._();
}
html.div().$class("usersinfo").h5("Active Users Info")._();
tbody._()._();
}
}
public static class QueueBlock extends HtmlBlock {
final CSQInfo csqinfo;
@ -166,6 +173,7 @@ public void render(Block html) {
csqinfo.qinfo = info;
if (info.getQueues() == null) {
li.ul("#lq").li()._(LeafQueueInfoBlock.class)._()._();
li.ul("#lq").li()._(QueueUsersInfoBlock.class)._()._();
} else {
li._(QueueBlock.class);
}

View File

@ -21,6 +21,7 @@
import org.apache.hadoop.util.StringUtils;
import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager;
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.ClusterMetricsInfo;
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.SchedulerInfo;
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.UserMetricsInfo;
import org.apache.hadoop.yarn.webapp.hamlet.Hamlet;
@ -153,6 +154,27 @@ protected void render(Block html) {
}
}
SchedulerInfo schedulerInfo=new SchedulerInfo(this.rm);
div.h3("Scheduler Metrics").
table("#schedulermetricsoverview").
thead().$class("ui-widget-header").
tr().
th().$class("ui-state-default")._("Scheduler Type")._().
th().$class("ui-state-default")._("Scheduling Resource Type")._().
th().$class("ui-state-default")._("Minimum Allocation")._().
th().$class("ui-state-default")._("Maximum Allocation")._().
_().
_().
tbody().$class("ui-widget-content").
tr().
td(String.valueOf(schedulerInfo.getSchedulerType())).
td(String.valueOf(schedulerInfo.getSchedulerResourceTypes())).
td(schedulerInfo.getMinAllocation().toString()).
td(schedulerInfo.getMaxAllocation().toString()).
_().
_()._();
div._();
}

View File

@ -35,7 +35,8 @@ public class CapacitySchedulerLeafQueueInfo extends CapacitySchedulerQueueInfo {
protected int userLimit;
protected UsersInfo users; // To add another level in the XML
protected float userLimitFactor;
protected ResourceInfo aMResourceLimit;
protected ResourceInfo AMResourceLimit;
protected ResourceInfo usedAMResource;
protected ResourceInfo userAMResourceLimit;
protected boolean preemptionDisabled;
@ -52,7 +53,8 @@ public class CapacitySchedulerLeafQueueInfo extends CapacitySchedulerQueueInfo {
userLimit = q.getUserLimit();
users = new UsersInfo(q.getUsers());
userLimitFactor = q.getUserLimitFactor();
aMResourceLimit = new ResourceInfo(q.getAMResourceLimit());
AMResourceLimit = new ResourceInfo(q.getAMResourceLimit());
usedAMResource = new ResourceInfo(q.getQueueResourceUsage().getAMUsed());
userAMResourceLimit = new ResourceInfo(q.getUserAMResourceLimit());
preemptionDisabled = q.getPreemptionDisabled();
}
@ -91,9 +93,13 @@ public float getUserLimitFactor() {
}
public ResourceInfo getAMResourceLimit() {
return aMResourceLimit;
return AMResourceLimit;
}
public ResourceInfo getUsedAMResource() {
return usedAMResource;
}
public ResourceInfo getUserAMResourceLimit() {
return userAMResourceLimit;
}

View File

@ -18,15 +18,59 @@
package org.apache.hadoop.yarn.server.resourcemanager.webapp.dao;
import java.util.EnumSet;
import javax.xml.bind.annotation.XmlRootElement;
import javax.xml.bind.annotation.XmlSeeAlso;
import org.apache.hadoop.yarn.proto.YarnServiceProtos.SchedulerResourceTypes;
import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.FairScheduler;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fifo.FifoScheduler;
@XmlRootElement
@XmlSeeAlso({ CapacitySchedulerInfo.class, FairSchedulerInfo.class,
FifoSchedulerInfo.class })
public class SchedulerInfo {
protected String schedulerName;
protected ResourceInfo minAllocResource;
protected ResourceInfo maxAllocResource;
protected EnumSet<SchedulerResourceTypes> schedulingResourceTypes;
public SchedulerInfo() {
} // JAXB needs this
public SchedulerInfo(final ResourceManager rm) {
ResourceScheduler rs = rm.getResourceScheduler();
if (rs instanceof CapacityScheduler) {
this.schedulerName = "Capacity Scheduler";
} else if (rs instanceof FairScheduler) {
this.schedulerName = "Fair Scheduler";
} else if (rs instanceof FifoScheduler) {
this.schedulerName = "Fifo Scheduler";
}
this.minAllocResource = new ResourceInfo(rs.getMinimumResourceCapability());
this.maxAllocResource = new ResourceInfo(rs.getMaximumResourceCapability());
this.schedulingResourceTypes = rs.getSchedulingResourceTypes();
}
public String getSchedulerType() {
return this.schedulerName;
}
public ResourceInfo getMinAllocation() {
return this.minAllocResource;
}
public ResourceInfo getMaxAllocation() {
return this.maxAllocResource;
}
public String getSchedulerResourceTypes() {
return this.schedulingResourceTypes.toString();
}
}

View File

@ -45,7 +45,10 @@
import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
import org.apache.hadoop.yarn.server.api.protocolrecords.UpdateNodeResourceRequest;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppImpl;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptImpl;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptMetrics;
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.AbstractYarnScheduler;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.Allocation;
@ -67,7 +70,9 @@
import org.junit.Assert;
import org.junit.BeforeClass;
import org.junit.Test;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
public class TestFifoScheduler {
private static final Log LOG = LogFactory.getLog(TestFifoScheduler.class);
@ -374,6 +379,8 @@ public void testBlackListNodes() throws Exception {
ApplicationId appId1 = BuilderUtils.newApplicationId(100, 1);
ApplicationAttemptId appAttemptId1 = BuilderUtils.newApplicationAttemptId(
appId1, 1);
createMockRMApp(appAttemptId1, rm.getRMContext());
SchedulerEvent appEvent =
new AppAddedSchedulerEvent(appId1, "queue", "user");
fs.handle(appEvent);
@ -468,6 +475,7 @@ public void testHeadroom() throws Exception {
ApplicationId appId1 = BuilderUtils.newApplicationId(100, 1);
ApplicationAttemptId appAttemptId1 = BuilderUtils.newApplicationAttemptId(
appId1, 1);
createMockRMApp(appAttemptId1, rm.getRMContext());
SchedulerEvent appEvent =
new AppAddedSchedulerEvent(appId1, "queue", "user");
fs.handle(appEvent);
@ -478,6 +486,7 @@ public void testHeadroom() throws Exception {
ApplicationId appId2 = BuilderUtils.newApplicationId(200, 2);
ApplicationAttemptId appAttemptId2 = BuilderUtils.newApplicationAttemptId(
appId2, 1);
createMockRMApp(appAttemptId2, rm.getRMContext());
SchedulerEvent appEvent2 =
new AppAddedSchedulerEvent(appId2, "queue", "user");
fs.handle(appEvent2);
@ -604,4 +613,17 @@ public static void main(String[] args) throws Exception {
t.testNonDefaultMinimumAllocation();
t.testReconnectedNode();
}
private RMAppImpl createMockRMApp(ApplicationAttemptId attemptId,
RMContext context) {
RMAppImpl app = mock(RMAppImpl.class);
when(app.getApplicationId()).thenReturn(attemptId.getApplicationId());
RMAppAttemptImpl attempt = mock(RMAppAttemptImpl.class);
when(attempt.getAppAttemptId()).thenReturn(attemptId);
RMAppAttemptMetrics attemptMetric = mock(RMAppAttemptMetrics.class);
when(attempt.getRMAppAttemptMetrics()).thenReturn(attemptMetric);
when(app.getCurrentAppAttempt()).thenReturn(attempt);
context.getRMApps().putIfAbsent(attemptId.getApplicationId(), app);
return app;
}
}

View File

@ -89,15 +89,18 @@
import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager;
import org.apache.hadoop.yarn.server.resourcemanager.recovery.MemoryRMStateStore;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppImpl;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppMetrics;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppState;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptImpl;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptMetrics;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptState;
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerState;
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.AbstractYarnScheduler;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.Allocation;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplication;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplicationAttempt;
@ -635,6 +638,18 @@ public void testBlackListNodes() throws Exception {
ApplicationId appId = BuilderUtils.newApplicationId(100, 1);
ApplicationAttemptId appAttemptId = BuilderUtils.newApplicationAttemptId(
appId, 1);
RMAppAttemptMetrics attemptMetric =
new RMAppAttemptMetrics(appAttemptId, rm.getRMContext());
RMAppImpl app = mock(RMAppImpl.class);
when(app.getApplicationId()).thenReturn(appId);
RMAppAttemptImpl attempt = mock(RMAppAttemptImpl.class);
when(attempt.getAppAttemptId()).thenReturn(appAttemptId);
when(attempt.getRMAppAttemptMetrics()).thenReturn(attemptMetric);
when(app.getCurrentAppAttempt()).thenReturn(attempt);
rm.getRMContext().getRMApps().put(appId, app);
SchedulerEvent addAppEvent =
new AppAddedSchedulerEvent(appId, "default", "user");
cs.handle(addAppEvent);
@ -2570,6 +2585,67 @@ public void testAMUsedResource() throws Exception {
rm.stop();
}
// Verifies headroom passed to ApplicationMaster has been updated in
// RMAppAttemptMetrics
@Test
public void testApplicationHeadRoom() throws Exception {
Configuration conf = new Configuration();
conf.setClass(YarnConfiguration.RM_SCHEDULER, CapacityScheduler.class,
ResourceScheduler.class);
MockRM rm = new MockRM(conf);
rm.start();
CapacityScheduler cs = (CapacityScheduler) rm.getResourceScheduler();
ApplicationId appId = BuilderUtils.newApplicationId(100, 1);
ApplicationAttemptId appAttemptId =
BuilderUtils.newApplicationAttemptId(appId, 1);
RMAppAttemptMetrics attemptMetric =
new RMAppAttemptMetrics(appAttemptId, rm.getRMContext());
RMAppImpl app = mock(RMAppImpl.class);
when(app.getApplicationId()).thenReturn(appId);
RMAppAttemptImpl attempt = mock(RMAppAttemptImpl.class);
when(attempt.getAppAttemptId()).thenReturn(appAttemptId);
when(attempt.getRMAppAttemptMetrics()).thenReturn(attemptMetric);
when(app.getCurrentAppAttempt()).thenReturn(attempt);
rm.getRMContext().getRMApps().put(appId, app);
SchedulerEvent addAppEvent =
new AppAddedSchedulerEvent(appId, "default", "user");
cs.handle(addAppEvent);
SchedulerEvent addAttemptEvent =
new AppAttemptAddedSchedulerEvent(appAttemptId, false);
cs.handle(addAttemptEvent);
Allocation allocate =
cs.allocate(appAttemptId, Collections.<ResourceRequest> emptyList(),
Collections.<ContainerId> emptyList(), null, null);
Assert.assertNotNull(attempt);
Assert
.assertEquals(Resource.newInstance(0, 0), allocate.getResourceLimit());
Assert.assertEquals(Resource.newInstance(0, 0),
attemptMetric.getApplicationAttemptHeadroom());
// Add a node to cluster
Resource newResource = Resource.newInstance(4 * GB, 1);
RMNode node = MockNodes.newNodeInfo(0, newResource, 1, "127.0.0.1");
cs.handle(new NodeAddedSchedulerEvent(node));
allocate =
cs.allocate(appAttemptId, Collections.<ResourceRequest> emptyList(),
Collections.<ContainerId> emptyList(), null, null);
// All resources should be sent as headroom
Assert.assertEquals(newResource, allocate.getResourceLimit());
Assert.assertEquals(newResource,
attemptMetric.getApplicationAttemptHeadroom());
rm.stop();
}
private void setMaxAllocMb(Configuration conf, int maxAllocMb) {
conf.setInt(YarnConfiguration.RM_SCHEDULER_MAXIMUM_ALLOCATION_MB,
maxAllocMb);

View File

@ -40,6 +40,7 @@
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppImpl;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptImpl;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptMetrics;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppAddedSchedulerEvent;
@ -157,7 +158,7 @@ protected ApplicationAttemptId createSchedulingRequest(
ResourceRequest request = createResourceRequest(memory, vcores, ResourceRequest.ANY,
priority, numContainers, true);
ask.add(request);
scheduler.allocate(id, ask, new ArrayList<ContainerId>(), null, null);
RMApp rmApp = mock(RMApp.class);
RMAppAttempt rmAppAttempt = mock(RMAppAttempt.class);
when(rmApp.getCurrentAppAttempt()).thenReturn(rmAppAttempt);
@ -165,6 +166,8 @@ protected ApplicationAttemptId createSchedulingRequest(
new RMAppAttemptMetrics(id, resourceManager.getRMContext()));
resourceManager.getRMContext().getRMApps()
.put(id.getApplicationId(), rmApp);
scheduler.allocate(id, ask, new ArrayList<ContainerId>(), null, null);
return id;
}
@ -178,7 +181,7 @@ protected ApplicationAttemptId createSchedulingRequest(String queueId,
if (scheduler.getSchedulerApplications().containsKey(id.getApplicationId())) {
scheduler.addApplicationAttempt(id, false, false);
}
scheduler.allocate(id, ask, new ArrayList<ContainerId>(), null, null);
RMApp rmApp = mock(RMApp.class);
RMAppAttempt rmAppAttempt = mock(RMAppAttempt.class);
when(rmApp.getCurrentAppAttempt()).thenReturn(rmAppAttempt);
@ -186,6 +189,8 @@ protected ApplicationAttemptId createSchedulingRequest(String queueId,
new RMAppAttemptMetrics(id,resourceManager.getRMContext()));
resourceManager.getRMContext().getRMApps()
.put(id.getApplicationId(), rmApp);
scheduler.allocate(id, ask, new ArrayList<ContainerId>(), null, null);
return id;
}
@ -225,4 +230,17 @@ protected void createApplicationWithAMResource(ApplicationAttemptId attId,
new AppAttemptAddedSchedulerEvent(attId, false);
scheduler.handle(attempAddedEvent);
}
protected RMApp createMockRMApp(ApplicationAttemptId attemptId) {
RMApp app = mock(RMAppImpl.class);
when(app.getApplicationId()).thenReturn(attemptId.getApplicationId());
RMAppAttemptImpl attempt = mock(RMAppAttemptImpl.class);
when(attempt.getAppAttemptId()).thenReturn(attemptId);
RMAppAttemptMetrics attemptMetric = mock(RMAppAttemptMetrics.class);
when(attempt.getRMAppAttemptMetrics()).thenReturn(attemptMetric);
when(app.getCurrentAppAttempt()).thenReturn(attempt);
resourceManager.getRMContext().getRMApps()
.put(attemptId.getApplicationId(), app);
return app;
}
}

View File

@ -27,6 +27,7 @@
import org.apache.hadoop.yarn.api.records.ResourceRequest;
import org.apache.hadoop.yarn.server.resourcemanager.MockNodes;
import org.apache.hadoop.yarn.server.resourcemanager.MockRM;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppImpl;
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeAddedSchedulerEvent;
@ -96,6 +97,8 @@ public void testSchedulingDelay() throws InterruptedException {
// and ANY requests
ApplicationAttemptId appAttemptId =
createAppAttemptId(this.APP_ID++, this.ATTEMPT_ID++);
createMockRMApp(appAttemptId);
scheduler.addApplication(appAttemptId.getApplicationId(), "queue11", "user11", false);
scheduler.addApplicationAttempt(appAttemptId, false, false);
List<ResourceRequest> ask = new ArrayList<>();

View File

@ -1379,12 +1379,16 @@ public void testQueueDemandCalculation() throws Exception {
scheduler.reinitialize(conf, resourceManager.getRMContext());
ApplicationAttemptId id11 = createAppAttemptId(1, 1);
createMockRMApp(id11);
scheduler.addApplication(id11.getApplicationId(), "root.queue1", "user1", false);
scheduler.addApplicationAttempt(id11, false, false);
ApplicationAttemptId id21 = createAppAttemptId(2, 1);
createMockRMApp(id21);
scheduler.addApplication(id21.getApplicationId(), "root.queue2", "user1", false);
scheduler.addApplicationAttempt(id21, false, false);
ApplicationAttemptId id22 = createAppAttemptId(2, 2);
createMockRMApp(id22);
scheduler.addApplication(id22.getApplicationId(), "root.queue2", "user1", false);
scheduler.addApplicationAttempt(id22, false, false);
@ -2717,9 +2721,13 @@ public void testMultipleNodesSingleRackRequest() throws Exception {
NodeAddedSchedulerEvent nodeEvent2 = new NodeAddedSchedulerEvent(node2);
scheduler.handle(nodeEvent2);
ApplicationAttemptId appId = createAppAttemptId(this.APP_ID++, this.ATTEMPT_ID++);
scheduler.addApplication(appId.getApplicationId(), "queue1", "user1", false);
scheduler.addApplicationAttempt(appId, false, false);
ApplicationAttemptId attemptId =
createAppAttemptId(this.APP_ID++, this.ATTEMPT_ID++);
createMockRMApp(attemptId);
scheduler.addApplication(attemptId.getApplicationId(), "queue1", "user1",
false);
scheduler.addApplicationAttempt(attemptId, false, false);
// 1 request with 2 nodes on the same rack. another request with 1 node on
// a different rack
@ -2731,21 +2739,24 @@ public void testMultipleNodesSingleRackRequest() throws Exception {
asks.add(createResourceRequest(1024, node3.getRackName(), 1, 1, true));
asks.add(createResourceRequest(1024, ResourceRequest.ANY, 1, 2, true));
scheduler.allocate(appId, asks, new ArrayList<ContainerId>(), null, null);
scheduler.allocate(attemptId, asks, new ArrayList<ContainerId>(), null,
null);
// node 1 checks in
scheduler.update();
NodeUpdateSchedulerEvent updateEvent1 = new NodeUpdateSchedulerEvent(node1);
scheduler.handle(updateEvent1);
// should assign node local
assertEquals(1, scheduler.getSchedulerApp(appId).getLiveContainers().size());
assertEquals(1, scheduler.getSchedulerApp(attemptId).getLiveContainers()
.size());
// node 2 checks in
scheduler.update();
NodeUpdateSchedulerEvent updateEvent2 = new NodeUpdateSchedulerEvent(node2);
scheduler.handle(updateEvent2);
// should assign rack local
assertEquals(2, scheduler.getSchedulerApp(appId).getLiveContainers().size());
assertEquals(2, scheduler.getSchedulerApp(attemptId).getLiveContainers()
.size());
}
@Test (timeout = 5000)
@ -3855,6 +3866,8 @@ public void testContinuousScheduling() throws Exception {
// send application request
ApplicationAttemptId appAttemptId =
createAppAttemptId(this.APP_ID++, this.ATTEMPT_ID++);
createMockRMApp(appAttemptId);
scheduler.addApplication(appAttemptId.getApplicationId(), "queue11", "user11", false);
scheduler.addApplicationAttempt(appAttemptId, false, false);
List<ResourceRequest> ask = new ArrayList<ResourceRequest>();

View File

@ -21,6 +21,7 @@
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
import java.io.IOException;
import java.lang.reflect.Method;
@ -57,6 +58,9 @@
import org.apache.hadoop.yarn.server.resourcemanager.Task;
import org.apache.hadoop.yarn.server.resourcemanager.ahs.RMApplicationHistoryWriter;
import org.apache.hadoop.yarn.server.resourcemanager.metrics.SystemMetricsPublisher;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppImpl;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptImpl;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptMetrics;
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.AbstractYarnScheduler;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.QueueMetrics;
@ -212,9 +216,12 @@ public void testNodeLocalAssignment() throws Exception {
int _appAttemptId = 1;
ApplicationAttemptId appAttemptId = createAppAttemptId(_appId,
_appAttemptId);
createMockRMApp(appAttemptId, rmContext);
AppAddedSchedulerEvent appEvent =
new AppAddedSchedulerEvent(appAttemptId.getApplicationId(), "queue1",
"user1");
"user1");
scheduler.handle(appEvent);
AppAttemptAddedSchedulerEvent attemptEvent =
new AppAttemptAddedSchedulerEvent(appAttemptId, false);
@ -310,6 +317,8 @@ public Map<NodeId, FiCaSchedulerNode> getNodes(){
int _appAttemptId = 1;
ApplicationAttemptId appAttemptId = createAppAttemptId(_appId,
_appAttemptId);
createMockRMApp(appAttemptId, rmContext);
AppAddedSchedulerEvent appEvent =
new AppAddedSchedulerEvent(appAttemptId.getApplicationId(), "queue1",
"user1");
@ -574,6 +583,9 @@ public void testBlackListNodes() throws Exception {
ApplicationId appId = BuilderUtils.newApplicationId(100, 1);
ApplicationAttemptId appAttemptId = BuilderUtils.newApplicationAttemptId(
appId, 1);
createMockRMApp(appAttemptId, rm.getRMContext());
SchedulerEvent appEvent =
new AppAddedSchedulerEvent(appId, "default",
"user");
@ -643,4 +655,18 @@ public static void main(String[] arg) throws Exception {
t.testFifoScheduler();
t.tearDown();
}
private RMAppImpl createMockRMApp(ApplicationAttemptId attemptId,
RMContext context) {
RMAppImpl app = mock(RMAppImpl.class);
when(app.getApplicationId()).thenReturn(attemptId.getApplicationId());
RMAppAttemptImpl attempt = mock(RMAppAttemptImpl.class);
when(attempt.getAppAttemptId()).thenReturn(attemptId);
RMAppAttemptMetrics attemptMetric = mock(RMAppAttemptMetrics.class);
when(attempt.getRMAppAttemptMetrics()).thenReturn(attemptMetric);
when(app.getCurrentAppAttempt()).thenReturn(attempt);
context.getRMApps().putIfAbsent(attemptId.getApplicationId(), app);
return app;
}
}

View File

@ -48,7 +48,7 @@ public class TestNodesPage {
// Number of Actual Table Headers for NodesPage.NodesBlock might change in
// future. In that case this value should be adjusted to the new value.
final int numberOfThInMetricsTable = 16;
final int numberOfThInMetricsTable = 20;
final int numberOfActualTableHeaders = 13;
private Injector injector;

View File

@ -347,7 +347,7 @@ private void verifySubQueue(JSONObject info, String q,
int numExpectedElements = 13;
boolean isParentQueue = true;
if (!info.has("queues")) {
numExpectedElements = 24;
numExpectedElements = 25;
isParentQueue = false;
}
assertEquals("incorrect number of elements", numExpectedElements, info.length());