YARN-4090. Make Collections.sort() more efficient by caching resource usage. (Contributed by Yufei Gu, Shilong Zhang and Xianyin Xin)

(cherry picked from commit 1f4cdf10681b6903207a63fb5c306c9665ed9464)
(cherry picked from commit 96106b8f5fe50e2d5c0c4df5dbddea4f89f278d9)
This commit is contained in:
Yufei Gu 2017-10-20 01:32:20 -07:00
parent a6370dde90
commit 01b2e8b9ca
5 changed files with 93 additions and 32 deletions

View File

@ -49,6 +49,7 @@
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerEventType;
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerFinishedEvent;
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerImpl;
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerState;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ActiveUsersManager;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.NodeType;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.QueueMetrics;
@ -174,6 +175,7 @@ void containerCompleted(RMContainer rmContainer,
rmContainer.getNodeLabelExpression(),
getUser(), 1, containerResource);
this.attemptResourceUsage.decUsed(containerResource);
getQueue().decUsedResource(containerResource);
// Clear resource utilization metrics cache.
lastMemoryAggregateAllocationUpdateTime = -1;
@ -468,6 +470,7 @@ public RMContainer allocate(NodeType type, FSSchedulerNode node,
List<ResourceRequest> resourceRequestList = appSchedulingInfo.allocate(
type, node, schedulerKey, container);
this.attemptResourceUsage.incUsed(container.getResource());
getQueue().incUsedResource(container.getResource());
// Update resource requests related to "request" and store in RMContainer
((RMContainerImpl) rmContainer).setResourceRequests(resourceRequestList);
@ -651,6 +654,22 @@ private Container createContainer(FSSchedulerNode node, Resource capability,
schedulerKey.getAllocationRequestId());
}
@Override
public synchronized void recoverContainer(SchedulerNode node,
RMContainer rmContainer) {
try {
writeLock.lock();
super.recoverContainer(node, rmContainer);
if (!rmContainer.getState().equals(RMContainerState.COMPLETED)) {
getQueue().incUsedResource(rmContainer.getContainer().getResource());
}
} finally {
writeLock.unlock();
}
}
/**
* Reserve a spot for {@code container} on this {@code node}. If
* the container is {@code alreadyReserved} on the node, simply

View File

@ -90,6 +90,7 @@ void addApp(FSAppAttempt app, boolean runnable) {
} else {
nonRunnableApps.add(app);
}
incUsedResource(app.getResourceUsage());
} finally {
writeLock.unlock();
}
@ -124,6 +125,7 @@ boolean removeApp(FSAppAttempt app) {
getMetrics().setAMResourceUsage(amResourceUsage);
}
decUsedResource(app.getResourceUsage());
return runnable;
}
@ -293,23 +295,6 @@ public Resource getDemand() {
return demand;
}
@Override
public Resource getResourceUsage() {
Resource usage = Resources.createResource(0);
readLock.lock();
try {
for (FSAppAttempt app : runnableApps) {
Resources.addTo(usage, app.getResourceUsage());
}
for (FSAppAttempt app : nonRunnableApps) {
Resources.addTo(usage, app.getResourceUsage());
}
} finally {
readLock.unlock();
}
return usage;
}
Resource getAmResourceUsage() {
return amResourceUsage;
}

View File

@ -118,20 +118,6 @@ public Resource getDemand() {
}
}
@Override
public Resource getResourceUsage() {
Resource usage = Resources.createResource(0);
readLock.lock();
try {
for (FSQueue child : childQueues) {
Resources.addTo(usage, child.getResourceUsage());
}
} finally {
readLock.unlock();
}
return usage;
}
@Override
public void updateDemand() {
// Compute demand by iterating through apps in the queue

View File

@ -58,6 +58,7 @@ public abstract class FSQueue implements Queue, Schedulable {
private Resource fairShare = Resources.createResource(0, 0);
private Resource steadyFairShare = Resources.createResource(0, 0);
private Resource reservedResource = Resources.createResource(0, 0);
private final Resource resourceUsage = Resource.newInstance(0, 0);
private final String name;
protected final FairScheduler scheduler;
private final YarnAuthorizationProvider authorizer;
@ -479,6 +480,39 @@ public void incReservedResource(String nodeLabel, Resource resourceToInc) {
public void decReservedResource(String nodeLabel, Resource resourceToDec) {
}
@Override
public Resource getResourceUsage() {
return resourceUsage;
}
/**
* Increase resource usage for this queue and all parent queues.
*
* @param res the resource to increase
*/
protected void incUsedResource(Resource res) {
synchronized (resourceUsage) {
Resources.addTo(resourceUsage, res);
if (parent != null) {
parent.incUsedResource(res);
}
}
}
/**
* Decrease resource usage for this queue and all parent queues.
*
* @param res the resource to decrease
*/
protected void decUsedResource(Resource res) {
synchronized (resourceUsage) {
Resources.subtractFrom(resourceUsage, res);
if (parent != null) {
parent.decUsedResource(res);
}
}
}
@Override
public Priority getDefaultApplicationPriority() {
// TODO add implementation for FSParentQueue

View File

@ -4584,6 +4584,43 @@ public void testAddAndRemoveAppFromFairScheduler() throws Exception {
TestSchedulerUtils.verifyAppAddedAndRemovedFromScheduler(
scheduler.getSchedulerApplications(), scheduler, "default");
}
@Test
public void testResourceUsageByMoveApp() throws Exception {
scheduler.init(conf);
scheduler.start();
scheduler.reinitialize(conf, resourceManager.getRMContext());
RMNode node1 = MockNodes.newNodeInfo(
1, Resources.createResource(1 * GB, 4), 1, "127.0.0.1");
NodeAddedSchedulerEvent nodeEvent1 = new NodeAddedSchedulerEvent(node1);
scheduler.handle(nodeEvent1);
ApplicationAttemptId appAttId =
createSchedulingRequest(1 * GB, 2, "parent1.queue1", "user1", 2);
scheduler.update();
NodeUpdateSchedulerEvent updateEvent = new NodeUpdateSchedulerEvent(node1);
scheduler.handle(updateEvent);
QueueManager queueMgr = scheduler.getQueueManager();
FSQueue parent1 = queueMgr.getParentQueue("parent1", true);
FSQueue parent2 = queueMgr.getParentQueue("parent2", true);
FSQueue queue2 = queueMgr.getLeafQueue("parent2.queue2", true);
FSQueue queue1 = queueMgr.getLeafQueue("parent1.queue1", true);
Assert.assertEquals(parent2.getResourceUsage().getMemorySize(), 0);
Assert.assertEquals(queue2.getResourceUsage().getMemorySize(), 0);
Assert.assertEquals(parent1.getResourceUsage().getMemorySize(), 1 * GB);
Assert.assertEquals(queue1.getResourceUsage().getMemorySize(), 1 * GB);
scheduler.moveApplication(appAttId.getApplicationId(), "parent2.queue2");
Assert.assertEquals(parent2.getResourceUsage().getMemorySize(), 1 * GB);
Assert.assertEquals(queue2.getResourceUsage().getMemorySize(), 1 * GB);
Assert.assertEquals(parent1.getResourceUsage().getMemorySize(), 0);
Assert.assertEquals(queue1.getResourceUsage().getMemorySize(), 0);
}
@Test (expected = YarnException.class)
public void testMoveWouldViolateMaxAppsConstraints() throws Exception {
@ -4597,7 +4634,7 @@ public void testMoveWouldViolateMaxAppsConstraints() throws Exception {
ApplicationAttemptId appAttId =
createSchedulingRequest(1024, 1, "queue1", "user1", 3);
scheduler.moveApplication(appAttId.getApplicationId(), "queue2");
}