YARN-3273. Improve scheduler UI to facilitate scheduling analysis and debugging. Contributed Rohith Sharmaks
(cherry picked from commit 658097d6da1b1aac8e01db459f0c3b456e99652f) Conflicts: hadoop-yarn-project/CHANGES.txt hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/webapp/AppAttemptBlock.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/dao/CapacitySchedulerLeafQueueInfo.java
This commit is contained in:
parent
3f1d1d0b48
commit
edc3cb736e
@ -338,6 +338,9 @@ Release 2.7.0 - UNRELEASED
|
||||
YARN-2777. Mark the end of individual log in aggregated log.
|
||||
(Varun Saxena via xgong)
|
||||
|
||||
YARN-3273. Improve scheduler UI to facilitate scheduling analysis and
|
||||
debugging. (Rohith Sharmaks via jianhe)
|
||||
|
||||
OPTIMIZATIONS
|
||||
|
||||
YARN-2990. FairScheduler's delay-scheduling always waits for node-local and
|
||||
|
@ -162,8 +162,6 @@ public Collection<ContainerReport> run() throws Exception {
|
||||
._("Diagnostics Info:", appAttempt.getDiagnosticsInfo() == null ?
|
||||
"" : appAttempt.getDiagnosticsInfo());
|
||||
|
||||
html._(InfoBlock.class);
|
||||
|
||||
if (exceptionWhenGetContainerReports) {
|
||||
html
|
||||
.p()
|
||||
|
@ -41,6 +41,8 @@ public class RMAppAttemptMetrics {
|
||||
private ApplicationAttemptId attemptId = null;
|
||||
// preemption info
|
||||
private Resource resourcePreempted = Resource.newInstance(0, 0);
|
||||
// application headroom
|
||||
private volatile Resource applicationHeadroom = Resource.newInstance(0, 0);
|
||||
private AtomicInteger numNonAMContainersPreempted = new AtomicInteger(0);
|
||||
private AtomicBoolean isPreempted = new AtomicBoolean(false);
|
||||
|
||||
@ -145,4 +147,12 @@ public int[][] getLocalityStatistics() {
|
||||
public int getTotalAllocatedContainers() {
|
||||
return this.totalAllocatedContainers;
|
||||
}
|
||||
|
||||
public Resource getApplicationAttemptHeadroom() {
|
||||
return applicationHeadroom;
|
||||
}
|
||||
|
||||
public void setApplicationAttemptHeadRoom(Resource headRoom) {
|
||||
this.applicationHeadroom = headRoom;
|
||||
}
|
||||
}
|
||||
|
@ -632,4 +632,14 @@ public void incNumAllocatedContainers(NodeType containerType,
|
||||
requestType);
|
||||
}
|
||||
}
|
||||
|
||||
public void setApplicationHeadroomForMetrics(Resource headroom) {
|
||||
RMAppAttempt attempt =
|
||||
rmContext.getRMApps().get(attemptId.getApplicationId())
|
||||
.getCurrentAppAttempt();
|
||||
if (attempt != null) {
|
||||
attempt.getRMAppAttemptMetrics().setApplicationAttemptHeadRoom(
|
||||
Resources.clone(headroom));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -420,10 +420,13 @@ public synchronized User getUser(String userName) {
|
||||
*/
|
||||
public synchronized ArrayList<UserInfo> getUsers() {
|
||||
ArrayList<UserInfo> usersToReturn = new ArrayList<UserInfo>();
|
||||
for (Map.Entry<String, User> entry: users.entrySet()) {
|
||||
usersToReturn.add(new UserInfo(entry.getKey(), Resources.clone(
|
||||
entry.getValue().getUsed()), entry.getValue().getActiveApplications(),
|
||||
entry.getValue().getPendingApplications()));
|
||||
for (Map.Entry<String, User> entry : users.entrySet()) {
|
||||
User user = entry.getValue();
|
||||
usersToReturn.add(new UserInfo(entry.getKey(), Resources.clone(user
|
||||
.getUsed()), user.getActiveApplications(), user
|
||||
.getPendingApplications(), Resources.clone(user
|
||||
.getConsumedAMResources()), Resources.clone(user
|
||||
.getUserResourceLimit())));
|
||||
}
|
||||
return usersToReturn;
|
||||
}
|
||||
@ -1158,7 +1161,7 @@ private Resource computeUserLimit(FiCaSchedulerApp application,
|
||||
" clusterCapacity: " + clusterResource
|
||||
);
|
||||
}
|
||||
|
||||
user.setUserResourceLimit(limit);
|
||||
return limit;
|
||||
}
|
||||
|
||||
@ -1818,6 +1821,7 @@ resourceCalculator, this, getParent(), clusterResource,
|
||||
@VisibleForTesting
|
||||
public static class User {
|
||||
ResourceUsage userResourceUsage = new ResourceUsage();
|
||||
volatile Resource userResourceLimit = Resource.newInstance(0, 0);
|
||||
int pendingApplications = 0;
|
||||
int activeApplications = 0;
|
||||
|
||||
@ -1887,6 +1891,14 @@ public void releaseContainer(Resource resource, Set<String> nodeLabels) {
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
public Resource getUserResourceLimit() {
|
||||
return userResourceLimit;
|
||||
}
|
||||
|
||||
public void setUserResourceLimit(Resource userResourceLimit) {
|
||||
this.userResourceLimit = userResourceLimit;
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -32,14 +32,19 @@ public class UserInfo {
|
||||
protected ResourceInfo resourcesUsed;
|
||||
protected int numPendingApplications;
|
||||
protected int numActiveApplications;
|
||||
protected ResourceInfo AMResourceUsed;
|
||||
protected ResourceInfo userResourceLimit;
|
||||
|
||||
UserInfo() {}
|
||||
|
||||
UserInfo(String username, Resource resUsed, int activeApps, int pendingApps) {
|
||||
UserInfo(String username, Resource resUsed, int activeApps, int pendingApps,
|
||||
Resource amResUsed, Resource resourceLimit) {
|
||||
this.username = username;
|
||||
this.resourcesUsed = new ResourceInfo(resUsed);
|
||||
this.numActiveApplications = activeApps;
|
||||
this.numPendingApplications = pendingApps;
|
||||
this.AMResourceUsed = new ResourceInfo(amResUsed);
|
||||
this.userResourceLimit = new ResourceInfo(resourceLimit);
|
||||
}
|
||||
|
||||
public String getUsername() {
|
||||
@ -57,4 +62,12 @@ public int getNumPendingApplications() {
|
||||
public int getNumActiveApplications() {
|
||||
return numActiveApplications;
|
||||
}
|
||||
|
||||
public ResourceInfo getAMResourcesUsed() {
|
||||
return AMResourceUsed;
|
||||
}
|
||||
|
||||
public ResourceInfo getUserResourceLimit() {
|
||||
return userResourceLimit;
|
||||
}
|
||||
}
|
||||
|
@ -268,7 +268,9 @@ public synchronized Allocation getAllocation(ResourceCalculator rc,
|
||||
minimumAllocation, numCont);
|
||||
ContainersAndNMTokensAllocation allocation =
|
||||
pullNewlyAllocatedContainersAndNMTokens();
|
||||
return new Allocation(allocation.getContainerList(), getHeadroom(), null,
|
||||
Resource headroom = getHeadroom();
|
||||
setApplicationHeadroomForMetrics(headroom);
|
||||
return new Allocation(allocation.getContainerList(), headroom, null,
|
||||
currentContPreemption, Collections.singletonList(rr),
|
||||
allocation.getNMTokenList());
|
||||
}
|
||||
|
@ -938,9 +938,10 @@ clusterResource, minimumAllocation, getMaximumResourceCapability(),
|
||||
application.updateBlacklist(blacklistAdditions, blacklistRemovals);
|
||||
ContainersAndNMTokensAllocation allocation =
|
||||
application.pullNewlyAllocatedContainersAndNMTokens();
|
||||
return new Allocation(allocation.getContainerList(),
|
||||
application.getHeadroom(), preemptionContainerIds, null, null,
|
||||
allocation.getNMTokenList());
|
||||
Resource headroom = application.getHeadroom();
|
||||
application.setApplicationHeadroomForMetrics(headroom);
|
||||
return new Allocation(allocation.getContainerList(), headroom,
|
||||
preemptionContainerIds, null, null, allocation.getNMTokenList());
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -343,9 +343,10 @@ public Allocation allocate(
|
||||
application.updateBlacklist(blacklistAdditions, blacklistRemovals);
|
||||
ContainersAndNMTokensAllocation allocation =
|
||||
application.pullNewlyAllocatedContainersAndNMTokens();
|
||||
return new Allocation(allocation.getContainerList(),
|
||||
application.getHeadroom(), null, null, null,
|
||||
allocation.getNMTokenList());
|
||||
Resource headroom = application.getHeadroom();
|
||||
application.setApplicationHeadroomForMetrics(headroom);
|
||||
return new Allocation(allocation.getContainerList(), headroom, null,
|
||||
null, null, allocation.getNMTokenList());
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -37,6 +37,8 @@
|
||||
import org.apache.hadoop.yarn.webapp.hamlet.Hamlet;
|
||||
import org.apache.hadoop.yarn.webapp.hamlet.Hamlet.DIV;
|
||||
import org.apache.hadoop.yarn.webapp.hamlet.Hamlet.LI;
|
||||
import org.apache.hadoop.yarn.webapp.hamlet.Hamlet.TABLE;
|
||||
import org.apache.hadoop.yarn.webapp.hamlet.Hamlet.TBODY;
|
||||
import org.apache.hadoop.yarn.webapp.hamlet.Hamlet.UL;
|
||||
import org.apache.hadoop.yarn.webapp.view.HtmlBlock;
|
||||
import org.apache.hadoop.yarn.webapp.view.InfoBlock;
|
||||
@ -67,41 +69,8 @@ static class LeafQueueInfoBlock extends HtmlBlock {
|
||||
lqinfo = (CapacitySchedulerLeafQueueInfo) info.qinfo;
|
||||
}
|
||||
|
||||
//Return a string describing one resource as a percentage of another
|
||||
private String getPercentage(ResourceInfo numerator, ResourceInfo denominator) {
|
||||
StringBuilder percentString = new StringBuilder("Memory: ");
|
||||
if (numerator != null) {
|
||||
percentString.append(numerator.getMemory());
|
||||
}
|
||||
if (denominator.getMemory() != 0) {
|
||||
percentString.append(" (<span title='of used resources in this queue'>")
|
||||
.append(StringUtils.format("%.2f", numerator.getMemory() * 100.0 /
|
||||
denominator.getMemory()) + "%</span>)");
|
||||
}
|
||||
percentString.append(", vCores: ");
|
||||
if (numerator != null) {
|
||||
percentString.append(numerator.getvCores());
|
||||
}
|
||||
if (denominator.getvCores() != 0) {
|
||||
percentString.append(" (<span title='of used resources in this queue'>")
|
||||
.append(StringUtils.format("%.2f", numerator.getvCores() * 100.0 /
|
||||
denominator.getvCores()) + "%</span>)");
|
||||
}
|
||||
return percentString.toString();
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void render(Block html) {
|
||||
StringBuilder activeUserList = new StringBuilder("");
|
||||
ResourceInfo usedResources = lqinfo.getResourcesUsed();
|
||||
ArrayList<UserInfo> users = lqinfo.getUsers().getUsersList();
|
||||
for (UserInfo entry: users) {
|
||||
activeUserList.append(entry.getUsername()).append(" <")
|
||||
.append(getPercentage(entry.getResourcesUsed(), usedResources))
|
||||
.append(", Schedulable Apps: " + entry.getNumActiveApplications())
|
||||
.append(", Non-Schedulable Apps: " + entry.getNumPendingApplications())
|
||||
.append("><br style='display:block'>"); //Force line break
|
||||
}
|
||||
|
||||
ResponseInfo ri = info("\'" + lqinfo.getQueuePath().substring(5) + "\' Queue Status").
|
||||
_("Queue State:", lqinfo.getQueueState()).
|
||||
@ -116,12 +85,12 @@ protected void render(Block html) {
|
||||
_("Max Applications:", Integer.toString(lqinfo.getMaxApplications())).
|
||||
_("Max Applications Per User:", Integer.toString(lqinfo.getMaxApplicationsPerUser())).
|
||||
_("Max Application Master Resources:", lqinfo.getAMResourceLimit().toString()).
|
||||
_("Used Application Master Resources:", lqinfo.getUsedAMResource().toString()).
|
||||
_("Max Application Master Resources Per User:", lqinfo.getUserAMResourceLimit().toString()).
|
||||
_("Configured Capacity:", percent(lqinfo.getCapacity() / 100)).
|
||||
_("Configured Max Capacity:", percent(lqinfo.getMaxCapacity() / 100)).
|
||||
_("Configured Minimum User Limit Percent:", Integer.toString(lqinfo.getUserLimit()) + "%").
|
||||
_("Configured User Limit Factor:", String.format("%.1f", lqinfo.getUserLimitFactor())).
|
||||
_r("Active Users: ", activeUserList.toString()).
|
||||
_("Accessible Node Labels:", StringUtils.join(",", lqinfo.getNodeLabels())).
|
||||
_("Preemption:", lqinfo.getPreemptionDisabled() ? "disabled" : "enabled");
|
||||
|
||||
@ -132,6 +101,44 @@ protected void render(Block html) {
|
||||
}
|
||||
}
|
||||
|
||||
static class QueueUsersInfoBlock extends HtmlBlock {
|
||||
final CapacitySchedulerLeafQueueInfo lqinfo;
|
||||
|
||||
@Inject
|
||||
QueueUsersInfoBlock(ViewContext ctx, CSQInfo info) {
|
||||
super(ctx);
|
||||
lqinfo = (CapacitySchedulerLeafQueueInfo) info.qinfo;
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void render(Block html) {
|
||||
TBODY<TABLE<Hamlet>> tbody =
|
||||
html.table("#userinfo").thead().$class("ui-widget-header").tr().th()
|
||||
.$class("ui-state-default")._("User Name")._().th()
|
||||
.$class("ui-state-default")._("Max Resource")._().th()
|
||||
.$class("ui-state-default")._("Used Resource")._().th()
|
||||
.$class("ui-state-default")._("Max AM Resource")._().th()
|
||||
.$class("ui-state-default")._("Used AM Resource")._().th()
|
||||
.$class("ui-state-default")._("Schedulable Apps")._().th()
|
||||
.$class("ui-state-default")._("Non-Schedulable Apps")._()._()._()
|
||||
.tbody();
|
||||
|
||||
ArrayList<UserInfo> users = lqinfo.getUsers().getUsersList();
|
||||
for (UserInfo userInfo : users) {
|
||||
tbody.tr().td(userInfo.getUsername())
|
||||
.td(userInfo.getUserResourceLimit().toString())
|
||||
.td(userInfo.getResourcesUsed().toString())
|
||||
.td(lqinfo.getUserAMResourceLimit().toString())
|
||||
.td(userInfo.getAMResourcesUsed().toString())
|
||||
.td(Integer.toString(userInfo.getNumActiveApplications()))
|
||||
.td(Integer.toString(userInfo.getNumPendingApplications()))._();
|
||||
}
|
||||
|
||||
html.div().$class("usersinfo").h5("Active Users Info")._();
|
||||
tbody._()._();
|
||||
}
|
||||
}
|
||||
|
||||
public static class QueueBlock extends HtmlBlock {
|
||||
final CSQInfo csqinfo;
|
||||
|
||||
@ -166,6 +173,7 @@ public void render(Block html) {
|
||||
csqinfo.qinfo = info;
|
||||
if (info.getQueues() == null) {
|
||||
li.ul("#lq").li()._(LeafQueueInfoBlock.class)._()._();
|
||||
li.ul("#lq").li()._(QueueUsersInfoBlock.class)._()._();
|
||||
} else {
|
||||
li._(QueueBlock.class);
|
||||
}
|
||||
|
@ -21,6 +21,7 @@
|
||||
import org.apache.hadoop.util.StringUtils;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.ClusterMetricsInfo;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.SchedulerInfo;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.UserMetricsInfo;
|
||||
|
||||
import org.apache.hadoop.yarn.webapp.hamlet.Hamlet;
|
||||
@ -153,6 +154,27 @@ protected void render(Block html) {
|
||||
|
||||
}
|
||||
}
|
||||
|
||||
SchedulerInfo schedulerInfo=new SchedulerInfo(this.rm);
|
||||
|
||||
div.h3("Scheduler Metrics").
|
||||
table("#schedulermetricsoverview").
|
||||
thead().$class("ui-widget-header").
|
||||
tr().
|
||||
th().$class("ui-state-default")._("Scheduler Type")._().
|
||||
th().$class("ui-state-default")._("Scheduling Resource Type")._().
|
||||
th().$class("ui-state-default")._("Minimum Allocation")._().
|
||||
th().$class("ui-state-default")._("Maximum Allocation")._().
|
||||
_().
|
||||
_().
|
||||
tbody().$class("ui-widget-content").
|
||||
tr().
|
||||
td(String.valueOf(schedulerInfo.getSchedulerType())).
|
||||
td(String.valueOf(schedulerInfo.getSchedulerResourceTypes())).
|
||||
td(schedulerInfo.getMinAllocation().toString()).
|
||||
td(schedulerInfo.getMaxAllocation().toString()).
|
||||
_().
|
||||
_()._();
|
||||
|
||||
div._();
|
||||
}
|
||||
|
@ -36,6 +36,7 @@ public class CapacitySchedulerLeafQueueInfo extends CapacitySchedulerQueueInfo {
|
||||
protected UsersInfo users; // To add another level in the XML
|
||||
protected float userLimitFactor;
|
||||
protected ResourceInfo AMResourceLimit;
|
||||
protected ResourceInfo usedAMResource;
|
||||
protected ResourceInfo userAMResourceLimit;
|
||||
protected boolean preemptionDisabled;
|
||||
|
||||
@ -53,6 +54,7 @@ public class CapacitySchedulerLeafQueueInfo extends CapacitySchedulerQueueInfo {
|
||||
users = new UsersInfo(q.getUsers());
|
||||
userLimitFactor = q.getUserLimitFactor();
|
||||
AMResourceLimit = new ResourceInfo(q.getAMResourceLimit());
|
||||
usedAMResource = new ResourceInfo(q.getQueueResourceUsage().getAMUsed());
|
||||
userAMResourceLimit = new ResourceInfo(q.getUserAMResourceLimit());
|
||||
preemptionDisabled = q.getPreemptionDisabled();
|
||||
}
|
||||
@ -94,6 +96,10 @@ public ResourceInfo getAMResourceLimit() {
|
||||
return AMResourceLimit;
|
||||
}
|
||||
|
||||
public ResourceInfo getUsedAMResource() {
|
||||
return usedAMResource;
|
||||
}
|
||||
|
||||
public ResourceInfo getUserAMResourceLimit() {
|
||||
return userAMResourceLimit;
|
||||
}
|
||||
|
@ -18,15 +18,59 @@
|
||||
|
||||
package org.apache.hadoop.yarn.server.resourcemanager.webapp.dao;
|
||||
|
||||
import java.util.EnumSet;
|
||||
|
||||
import javax.xml.bind.annotation.XmlRootElement;
|
||||
import javax.xml.bind.annotation.XmlSeeAlso;
|
||||
|
||||
import org.apache.hadoop.yarn.proto.YarnServiceProtos.SchedulerResourceTypes;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.FairScheduler;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fifo.FifoScheduler;
|
||||
|
||||
@XmlRootElement
|
||||
@XmlSeeAlso({ CapacitySchedulerInfo.class, FairSchedulerInfo.class,
|
||||
FifoSchedulerInfo.class })
|
||||
public class SchedulerInfo {
|
||||
protected String schedulerName;
|
||||
protected ResourceInfo minAllocResource;
|
||||
protected ResourceInfo maxAllocResource;
|
||||
protected EnumSet<SchedulerResourceTypes> schedulingResourceTypes;
|
||||
|
||||
public SchedulerInfo() {
|
||||
} // JAXB needs this
|
||||
|
||||
public SchedulerInfo(final ResourceManager rm) {
|
||||
ResourceScheduler rs = rm.getResourceScheduler();
|
||||
|
||||
if (rs instanceof CapacityScheduler) {
|
||||
this.schedulerName = "Capacity Scheduler";
|
||||
} else if (rs instanceof FairScheduler) {
|
||||
this.schedulerName = "Fair Scheduler";
|
||||
} else if (rs instanceof FifoScheduler) {
|
||||
this.schedulerName = "Fifo Scheduler";
|
||||
}
|
||||
this.minAllocResource = new ResourceInfo(rs.getMinimumResourceCapability());
|
||||
this.maxAllocResource = new ResourceInfo(rs.getMaximumResourceCapability());
|
||||
this.schedulingResourceTypes = rs.getSchedulingResourceTypes();
|
||||
}
|
||||
|
||||
public String getSchedulerType() {
|
||||
return this.schedulerName;
|
||||
}
|
||||
|
||||
public ResourceInfo getMinAllocation() {
|
||||
return this.minAllocResource;
|
||||
}
|
||||
|
||||
public ResourceInfo getMaxAllocation() {
|
||||
return this.maxAllocResource;
|
||||
}
|
||||
|
||||
public String getSchedulerResourceTypes() {
|
||||
return this.schedulingResourceTypes.toString();
|
||||
}
|
||||
|
||||
}
|
||||
|
@ -45,7 +45,10 @@
|
||||
import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
|
||||
import org.apache.hadoop.yarn.server.api.protocolrecords.UpdateNodeResourceRequest;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppImpl;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptImpl;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptMetrics;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.AbstractYarnScheduler;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.Allocation;
|
||||
@ -67,7 +70,9 @@
|
||||
import org.junit.Assert;
|
||||
import org.junit.BeforeClass;
|
||||
import org.junit.Test;
|
||||
|
||||
import static org.mockito.Mockito.mock;
|
||||
import static org.mockito.Mockito.when;
|
||||
|
||||
public class TestFifoScheduler {
|
||||
private static final Log LOG = LogFactory.getLog(TestFifoScheduler.class);
|
||||
@ -374,6 +379,8 @@ public void testBlackListNodes() throws Exception {
|
||||
ApplicationId appId1 = BuilderUtils.newApplicationId(100, 1);
|
||||
ApplicationAttemptId appAttemptId1 = BuilderUtils.newApplicationAttemptId(
|
||||
appId1, 1);
|
||||
createMockRMApp(appAttemptId1, rm.getRMContext());
|
||||
|
||||
SchedulerEvent appEvent =
|
||||
new AppAddedSchedulerEvent(appId1, "queue", "user");
|
||||
fs.handle(appEvent);
|
||||
@ -468,6 +475,7 @@ public void testHeadroom() throws Exception {
|
||||
ApplicationId appId1 = BuilderUtils.newApplicationId(100, 1);
|
||||
ApplicationAttemptId appAttemptId1 = BuilderUtils.newApplicationAttemptId(
|
||||
appId1, 1);
|
||||
createMockRMApp(appAttemptId1, rm.getRMContext());
|
||||
SchedulerEvent appEvent =
|
||||
new AppAddedSchedulerEvent(appId1, "queue", "user");
|
||||
fs.handle(appEvent);
|
||||
@ -478,6 +486,7 @@ public void testHeadroom() throws Exception {
|
||||
ApplicationId appId2 = BuilderUtils.newApplicationId(200, 2);
|
||||
ApplicationAttemptId appAttemptId2 = BuilderUtils.newApplicationAttemptId(
|
||||
appId2, 1);
|
||||
createMockRMApp(appAttemptId2, rm.getRMContext());
|
||||
SchedulerEvent appEvent2 =
|
||||
new AppAddedSchedulerEvent(appId2, "queue", "user");
|
||||
fs.handle(appEvent2);
|
||||
@ -604,4 +613,17 @@ public static void main(String[] args) throws Exception {
|
||||
t.testNonDefaultMinimumAllocation();
|
||||
t.testReconnectedNode();
|
||||
}
|
||||
|
||||
private RMAppImpl createMockRMApp(ApplicationAttemptId attemptId,
|
||||
RMContext context) {
|
||||
RMAppImpl app = mock(RMAppImpl.class);
|
||||
when(app.getApplicationId()).thenReturn(attemptId.getApplicationId());
|
||||
RMAppAttemptImpl attempt = mock(RMAppAttemptImpl.class);
|
||||
when(attempt.getAppAttemptId()).thenReturn(attemptId);
|
||||
RMAppAttemptMetrics attemptMetric = mock(RMAppAttemptMetrics.class);
|
||||
when(attempt.getRMAppAttemptMetrics()).thenReturn(attemptMetric);
|
||||
when(app.getCurrentAppAttempt()).thenReturn(attempt);
|
||||
context.getRMApps().putIfAbsent(attemptId.getApplicationId(), app);
|
||||
return app;
|
||||
}
|
||||
}
|
||||
|
@ -89,15 +89,18 @@
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.recovery.MemoryRMStateStore;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppImpl;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppMetrics;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppState;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptImpl;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptMetrics;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptState;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerState;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.AbstractYarnScheduler;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.Allocation;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplication;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplicationAttempt;
|
||||
@ -634,6 +637,18 @@ public void testBlackListNodes() throws Exception {
|
||||
ApplicationId appId = BuilderUtils.newApplicationId(100, 1);
|
||||
ApplicationAttemptId appAttemptId = BuilderUtils.newApplicationAttemptId(
|
||||
appId, 1);
|
||||
|
||||
RMAppAttemptMetrics attemptMetric =
|
||||
new RMAppAttemptMetrics(appAttemptId, rm.getRMContext());
|
||||
RMAppImpl app = mock(RMAppImpl.class);
|
||||
when(app.getApplicationId()).thenReturn(appId);
|
||||
RMAppAttemptImpl attempt = mock(RMAppAttemptImpl.class);
|
||||
when(attempt.getAppAttemptId()).thenReturn(appAttemptId);
|
||||
when(attempt.getRMAppAttemptMetrics()).thenReturn(attemptMetric);
|
||||
when(app.getCurrentAppAttempt()).thenReturn(attempt);
|
||||
|
||||
rm.getRMContext().getRMApps().put(appId, app);
|
||||
|
||||
SchedulerEvent addAppEvent =
|
||||
new AppAddedSchedulerEvent(appId, "default", "user");
|
||||
cs.handle(addAppEvent);
|
||||
@ -2484,6 +2499,67 @@ public void testHierarchyQueuesCurrentLimits() throws Exception {
|
||||
am1.doHeartbeat().getAvailableResources().getMemory());
|
||||
}
|
||||
|
||||
// Verifies headroom passed to ApplicationMaster has been updated in
|
||||
// RMAppAttemptMetrics
|
||||
@Test
|
||||
public void testApplicationHeadRoom() throws Exception {
|
||||
Configuration conf = new Configuration();
|
||||
conf.setClass(YarnConfiguration.RM_SCHEDULER, CapacityScheduler.class,
|
||||
ResourceScheduler.class);
|
||||
MockRM rm = new MockRM(conf);
|
||||
rm.start();
|
||||
CapacityScheduler cs = (CapacityScheduler) rm.getResourceScheduler();
|
||||
|
||||
ApplicationId appId = BuilderUtils.newApplicationId(100, 1);
|
||||
ApplicationAttemptId appAttemptId =
|
||||
BuilderUtils.newApplicationAttemptId(appId, 1);
|
||||
|
||||
RMAppAttemptMetrics attemptMetric =
|
||||
new RMAppAttemptMetrics(appAttemptId, rm.getRMContext());
|
||||
RMAppImpl app = mock(RMAppImpl.class);
|
||||
when(app.getApplicationId()).thenReturn(appId);
|
||||
RMAppAttemptImpl attempt = mock(RMAppAttemptImpl.class);
|
||||
when(attempt.getAppAttemptId()).thenReturn(appAttemptId);
|
||||
when(attempt.getRMAppAttemptMetrics()).thenReturn(attemptMetric);
|
||||
when(app.getCurrentAppAttempt()).thenReturn(attempt);
|
||||
|
||||
rm.getRMContext().getRMApps().put(appId, app);
|
||||
|
||||
SchedulerEvent addAppEvent =
|
||||
new AppAddedSchedulerEvent(appId, "default", "user");
|
||||
cs.handle(addAppEvent);
|
||||
SchedulerEvent addAttemptEvent =
|
||||
new AppAttemptAddedSchedulerEvent(appAttemptId, false);
|
||||
cs.handle(addAttemptEvent);
|
||||
|
||||
Allocation allocate =
|
||||
cs.allocate(appAttemptId, Collections.<ResourceRequest> emptyList(),
|
||||
Collections.<ContainerId> emptyList(), null, null);
|
||||
|
||||
Assert.assertNotNull(attempt);
|
||||
|
||||
Assert
|
||||
.assertEquals(Resource.newInstance(0, 0), allocate.getResourceLimit());
|
||||
Assert.assertEquals(Resource.newInstance(0, 0),
|
||||
attemptMetric.getApplicationAttemptHeadroom());
|
||||
|
||||
// Add a node to cluster
|
||||
Resource newResource = Resource.newInstance(4 * GB, 1);
|
||||
RMNode node = MockNodes.newNodeInfo(0, newResource, 1, "127.0.0.1");
|
||||
cs.handle(new NodeAddedSchedulerEvent(node));
|
||||
|
||||
allocate =
|
||||
cs.allocate(appAttemptId, Collections.<ResourceRequest> emptyList(),
|
||||
Collections.<ContainerId> emptyList(), null, null);
|
||||
|
||||
// All resources should be sent as headroom
|
||||
Assert.assertEquals(newResource, allocate.getResourceLimit());
|
||||
Assert.assertEquals(newResource,
|
||||
attemptMetric.getApplicationAttemptHeadroom());
|
||||
|
||||
rm.stop();
|
||||
}
|
||||
|
||||
private void setMaxAllocMb(Configuration conf, int maxAllocMb) {
|
||||
conf.setInt(YarnConfiguration.RM_SCHEDULER_MAXIMUM_ALLOCATION_MB,
|
||||
maxAllocMb);
|
||||
|
@ -40,6 +40,7 @@
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppImpl;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptImpl;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptMetrics;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppAddedSchedulerEvent;
|
||||
@ -157,7 +158,7 @@ protected ApplicationAttemptId createSchedulingRequest(
|
||||
ResourceRequest request = createResourceRequest(memory, vcores, ResourceRequest.ANY,
|
||||
priority, numContainers, true);
|
||||
ask.add(request);
|
||||
scheduler.allocate(id, ask, new ArrayList<ContainerId>(), null, null);
|
||||
|
||||
RMApp rmApp = mock(RMApp.class);
|
||||
RMAppAttempt rmAppAttempt = mock(RMAppAttempt.class);
|
||||
when(rmApp.getCurrentAppAttempt()).thenReturn(rmAppAttempt);
|
||||
@ -165,6 +166,8 @@ protected ApplicationAttemptId createSchedulingRequest(
|
||||
new RMAppAttemptMetrics(id, resourceManager.getRMContext()));
|
||||
resourceManager.getRMContext().getRMApps()
|
||||
.put(id.getApplicationId(), rmApp);
|
||||
|
||||
scheduler.allocate(id, ask, new ArrayList<ContainerId>(), null, null);
|
||||
return id;
|
||||
}
|
||||
|
||||
@ -178,7 +181,7 @@ protected ApplicationAttemptId createSchedulingRequest(String queueId,
|
||||
if (scheduler.getSchedulerApplications().containsKey(id.getApplicationId())) {
|
||||
scheduler.addApplicationAttempt(id, false, false);
|
||||
}
|
||||
scheduler.allocate(id, ask, new ArrayList<ContainerId>(), null, null);
|
||||
|
||||
RMApp rmApp = mock(RMApp.class);
|
||||
RMAppAttempt rmAppAttempt = mock(RMAppAttempt.class);
|
||||
when(rmApp.getCurrentAppAttempt()).thenReturn(rmAppAttempt);
|
||||
@ -186,6 +189,8 @@ protected ApplicationAttemptId createSchedulingRequest(String queueId,
|
||||
new RMAppAttemptMetrics(id,resourceManager.getRMContext()));
|
||||
resourceManager.getRMContext().getRMApps()
|
||||
.put(id.getApplicationId(), rmApp);
|
||||
|
||||
scheduler.allocate(id, ask, new ArrayList<ContainerId>(), null, null);
|
||||
return id;
|
||||
}
|
||||
|
||||
@ -225,4 +230,17 @@ protected void createApplicationWithAMResource(ApplicationAttemptId attId,
|
||||
new AppAttemptAddedSchedulerEvent(attId, false);
|
||||
scheduler.handle(attempAddedEvent);
|
||||
}
|
||||
|
||||
protected RMApp createMockRMApp(ApplicationAttemptId attemptId) {
|
||||
RMApp app = mock(RMAppImpl.class);
|
||||
when(app.getApplicationId()).thenReturn(attemptId.getApplicationId());
|
||||
RMAppAttemptImpl attempt = mock(RMAppAttemptImpl.class);
|
||||
when(attempt.getAppAttemptId()).thenReturn(attemptId);
|
||||
RMAppAttemptMetrics attemptMetric = mock(RMAppAttemptMetrics.class);
|
||||
when(attempt.getRMAppAttemptMetrics()).thenReturn(attemptMetric);
|
||||
when(app.getCurrentAppAttempt()).thenReturn(attempt);
|
||||
resourceManager.getRMContext().getRMApps()
|
||||
.put(attemptId.getApplicationId(), app);
|
||||
return app;
|
||||
}
|
||||
}
|
@ -27,6 +27,7 @@
|
||||
import org.apache.hadoop.yarn.api.records.ResourceRequest;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.MockNodes;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.MockRM;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppImpl;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeAddedSchedulerEvent;
|
||||
|
||||
@ -96,6 +97,8 @@ public void testSchedulingDelay() throws InterruptedException {
|
||||
// and ANY requests
|
||||
ApplicationAttemptId appAttemptId =
|
||||
createAppAttemptId(this.APP_ID++, this.ATTEMPT_ID++);
|
||||
createMockRMApp(appAttemptId);
|
||||
|
||||
scheduler.addApplication(appAttemptId.getApplicationId(), "queue11", "user11", false);
|
||||
scheduler.addApplicationAttempt(appAttemptId, false, false);
|
||||
List<ResourceRequest> ask = new ArrayList<>();
|
||||
|
@ -1379,12 +1379,16 @@ public void testQueueDemandCalculation() throws Exception {
|
||||
scheduler.reinitialize(conf, resourceManager.getRMContext());
|
||||
|
||||
ApplicationAttemptId id11 = createAppAttemptId(1, 1);
|
||||
createMockRMApp(id11);
|
||||
scheduler.addApplication(id11.getApplicationId(), "root.queue1", "user1", false);
|
||||
scheduler.addApplicationAttempt(id11, false, false);
|
||||
ApplicationAttemptId id21 = createAppAttemptId(2, 1);
|
||||
createMockRMApp(id21);
|
||||
scheduler.addApplication(id21.getApplicationId(), "root.queue2", "user1", false);
|
||||
scheduler.addApplicationAttempt(id21, false, false);
|
||||
ApplicationAttemptId id22 = createAppAttemptId(2, 2);
|
||||
createMockRMApp(id22);
|
||||
|
||||
scheduler.addApplication(id22.getApplicationId(), "root.queue2", "user1", false);
|
||||
scheduler.addApplicationAttempt(id22, false, false);
|
||||
|
||||
@ -2717,9 +2721,13 @@ public void testMultipleNodesSingleRackRequest() throws Exception {
|
||||
NodeAddedSchedulerEvent nodeEvent2 = new NodeAddedSchedulerEvent(node2);
|
||||
scheduler.handle(nodeEvent2);
|
||||
|
||||
ApplicationAttemptId appId = createAppAttemptId(this.APP_ID++, this.ATTEMPT_ID++);
|
||||
scheduler.addApplication(appId.getApplicationId(), "queue1", "user1", false);
|
||||
scheduler.addApplicationAttempt(appId, false, false);
|
||||
ApplicationAttemptId attemptId =
|
||||
createAppAttemptId(this.APP_ID++, this.ATTEMPT_ID++);
|
||||
createMockRMApp(attemptId);
|
||||
|
||||
scheduler.addApplication(attemptId.getApplicationId(), "queue1", "user1",
|
||||
false);
|
||||
scheduler.addApplicationAttempt(attemptId, false, false);
|
||||
|
||||
// 1 request with 2 nodes on the same rack. another request with 1 node on
|
||||
// a different rack
|
||||
@ -2731,21 +2739,24 @@ public void testMultipleNodesSingleRackRequest() throws Exception {
|
||||
asks.add(createResourceRequest(1024, node3.getRackName(), 1, 1, true));
|
||||
asks.add(createResourceRequest(1024, ResourceRequest.ANY, 1, 2, true));
|
||||
|
||||
scheduler.allocate(appId, asks, new ArrayList<ContainerId>(), null, null);
|
||||
scheduler.allocate(attemptId, asks, new ArrayList<ContainerId>(), null,
|
||||
null);
|
||||
|
||||
// node 1 checks in
|
||||
scheduler.update();
|
||||
NodeUpdateSchedulerEvent updateEvent1 = new NodeUpdateSchedulerEvent(node1);
|
||||
scheduler.handle(updateEvent1);
|
||||
// should assign node local
|
||||
assertEquals(1, scheduler.getSchedulerApp(appId).getLiveContainers().size());
|
||||
assertEquals(1, scheduler.getSchedulerApp(attemptId).getLiveContainers()
|
||||
.size());
|
||||
|
||||
// node 2 checks in
|
||||
scheduler.update();
|
||||
NodeUpdateSchedulerEvent updateEvent2 = new NodeUpdateSchedulerEvent(node2);
|
||||
scheduler.handle(updateEvent2);
|
||||
// should assign rack local
|
||||
assertEquals(2, scheduler.getSchedulerApp(appId).getLiveContainers().size());
|
||||
assertEquals(2, scheduler.getSchedulerApp(attemptId).getLiveContainers()
|
||||
.size());
|
||||
}
|
||||
|
||||
@Test (timeout = 5000)
|
||||
@ -3855,6 +3866,8 @@ public void testContinuousScheduling() throws Exception {
|
||||
// send application request
|
||||
ApplicationAttemptId appAttemptId =
|
||||
createAppAttemptId(this.APP_ID++, this.ATTEMPT_ID++);
|
||||
createMockRMApp(appAttemptId);
|
||||
|
||||
scheduler.addApplication(appAttemptId.getApplicationId(), "queue11", "user11", false);
|
||||
scheduler.addApplicationAttempt(appAttemptId, false, false);
|
||||
List<ResourceRequest> ask = new ArrayList<ResourceRequest>();
|
||||
|
@ -21,6 +21,7 @@
|
||||
import static org.junit.Assert.assertEquals;
|
||||
import static org.junit.Assert.assertTrue;
|
||||
import static org.mockito.Mockito.mock;
|
||||
import static org.mockito.Mockito.when;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.lang.reflect.Method;
|
||||
@ -57,6 +58,9 @@
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.Task;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.ahs.RMApplicationHistoryWriter;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.metrics.SystemMetricsPublisher;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppImpl;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptImpl;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptMetrics;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.AbstractYarnScheduler;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.QueueMetrics;
|
||||
@ -212,9 +216,12 @@ public void testNodeLocalAssignment() throws Exception {
|
||||
int _appAttemptId = 1;
|
||||
ApplicationAttemptId appAttemptId = createAppAttemptId(_appId,
|
||||
_appAttemptId);
|
||||
|
||||
createMockRMApp(appAttemptId, rmContext);
|
||||
|
||||
AppAddedSchedulerEvent appEvent =
|
||||
new AppAddedSchedulerEvent(appAttemptId.getApplicationId(), "queue1",
|
||||
"user1");
|
||||
"user1");
|
||||
scheduler.handle(appEvent);
|
||||
AppAttemptAddedSchedulerEvent attemptEvent =
|
||||
new AppAttemptAddedSchedulerEvent(appAttemptId, false);
|
||||
@ -310,6 +317,8 @@ public Map<NodeId, FiCaSchedulerNode> getNodes(){
|
||||
int _appAttemptId = 1;
|
||||
ApplicationAttemptId appAttemptId = createAppAttemptId(_appId,
|
||||
_appAttemptId);
|
||||
createMockRMApp(appAttemptId, rmContext);
|
||||
|
||||
AppAddedSchedulerEvent appEvent =
|
||||
new AppAddedSchedulerEvent(appAttemptId.getApplicationId(), "queue1",
|
||||
"user1");
|
||||
@ -574,6 +583,9 @@ public void testBlackListNodes() throws Exception {
|
||||
ApplicationId appId = BuilderUtils.newApplicationId(100, 1);
|
||||
ApplicationAttemptId appAttemptId = BuilderUtils.newApplicationAttemptId(
|
||||
appId, 1);
|
||||
|
||||
createMockRMApp(appAttemptId, rm.getRMContext());
|
||||
|
||||
SchedulerEvent appEvent =
|
||||
new AppAddedSchedulerEvent(appId, "default",
|
||||
"user");
|
||||
@ -643,4 +655,18 @@ public static void main(String[] arg) throws Exception {
|
||||
t.testFifoScheduler();
|
||||
t.tearDown();
|
||||
}
|
||||
|
||||
private RMAppImpl createMockRMApp(ApplicationAttemptId attemptId,
|
||||
RMContext context) {
|
||||
RMAppImpl app = mock(RMAppImpl.class);
|
||||
when(app.getApplicationId()).thenReturn(attemptId.getApplicationId());
|
||||
RMAppAttemptImpl attempt = mock(RMAppAttemptImpl.class);
|
||||
when(attempt.getAppAttemptId()).thenReturn(attemptId);
|
||||
RMAppAttemptMetrics attemptMetric = mock(RMAppAttemptMetrics.class);
|
||||
when(attempt.getRMAppAttemptMetrics()).thenReturn(attemptMetric);
|
||||
when(app.getCurrentAppAttempt()).thenReturn(attempt);
|
||||
context.getRMApps().putIfAbsent(attemptId.getApplicationId(), app);
|
||||
return app;
|
||||
}
|
||||
|
||||
}
|
||||
|
@ -48,7 +48,7 @@ public class TestNodesPage {
|
||||
|
||||
// Number of Actual Table Headers for NodesPage.NodesBlock might change in
|
||||
// future. In that case this value should be adjusted to the new value.
|
||||
final int numberOfThInMetricsTable = 16;
|
||||
final int numberOfThInMetricsTable = 20;
|
||||
final int numberOfActualTableHeaders = 13;
|
||||
|
||||
private Injector injector;
|
||||
|
@ -347,7 +347,7 @@ private void verifySubQueue(JSONObject info, String q,
|
||||
int numExpectedElements = 13;
|
||||
boolean isParentQueue = true;
|
||||
if (!info.has("queues")) {
|
||||
numExpectedElements = 24;
|
||||
numExpectedElements = 25;
|
||||
isParentQueue = false;
|
||||
}
|
||||
assertEquals("incorrect number of elements", numExpectedElements, info.length());
|
||||
|
Loading…
x
Reference in New Issue
Block a user