YARN-4090. Make Collections.sort() more efficient by caching resource usage. (Contributed by Yufei Gu, Shilong Zhang and Xianyin Xin)

(cherry picked from commit 1f4cdf1068)
(cherry picked from commit 96106b8f5f)
This commit is contained in:
Yufei Gu 2017-10-20 01:32:20 -07:00 committed by vrushali
parent c9f6a98b82
commit 4c4f28c9e7
5 changed files with 93 additions and 32 deletions

View File

@ -49,6 +49,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerEven
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerEventType;
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerFinishedEvent;
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerImpl;
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerState;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ActiveUsersManager;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.NodeType;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.QueueMetrics;
@ -174,6 +175,7 @@ public class FSAppAttempt extends SchedulerApplicationAttempt
rmContainer.getNodeLabelExpression(),
getUser(), 1, containerResource);
this.attemptResourceUsage.decUsed(containerResource);
getQueue().decUsedResource(containerResource);
// Clear resource utilization metrics cache.
lastMemoryAggregateAllocationUpdateTime = -1;
@ -468,6 +470,7 @@ public class FSAppAttempt extends SchedulerApplicationAttempt
List<ResourceRequest> resourceRequestList = appSchedulingInfo.allocate(
type, node, schedulerKey, container);
this.attemptResourceUsage.incUsed(container.getResource());
getQueue().incUsedResource(container.getResource());
// Update resource requests related to "request" and store in RMContainer
((RMContainerImpl) rmContainer).setResourceRequests(resourceRequestList);
@ -651,6 +654,22 @@ public class FSAppAttempt extends SchedulerApplicationAttempt
schedulerKey.getAllocationRequestId());
}
@Override
public synchronized void recoverContainer(SchedulerNode node,
RMContainer rmContainer) {
try {
writeLock.lock();
super.recoverContainer(node, rmContainer);
if (!rmContainer.getState().equals(RMContainerState.COMPLETED)) {
getQueue().incUsedResource(rmContainer.getContainer().getResource());
}
} finally {
writeLock.unlock();
}
}
/**
* Reserve a spot for {@code container} on this {@code node}. If
* the container is {@code alreadyReserved} on the node, simply

View File

@ -90,6 +90,7 @@ public class FSLeafQueue extends FSQueue {
} else {
nonRunnableApps.add(app);
}
incUsedResource(app.getResourceUsage());
} finally {
writeLock.unlock();
}
@ -124,6 +125,7 @@ public class FSLeafQueue extends FSQueue {
getMetrics().setAMResourceUsage(amResourceUsage);
}
decUsedResource(app.getResourceUsage());
return runnable;
}
@ -293,23 +295,6 @@ public class FSLeafQueue extends FSQueue {
return demand;
}
@Override
public Resource getResourceUsage() {
Resource usage = Resources.createResource(0);
readLock.lock();
try {
for (FSAppAttempt app : runnableApps) {
Resources.addTo(usage, app.getResourceUsage());
}
for (FSAppAttempt app : nonRunnableApps) {
Resources.addTo(usage, app.getResourceUsage());
}
} finally {
readLock.unlock();
}
return usage;
}
Resource getAmResourceUsage() {
return amResourceUsage;
}

View File

@ -118,20 +118,6 @@ public class FSParentQueue extends FSQueue {
}
}
@Override
public Resource getResourceUsage() {
Resource usage = Resources.createResource(0);
readLock.lock();
try {
for (FSQueue child : childQueues) {
Resources.addTo(usage, child.getResourceUsage());
}
} finally {
readLock.unlock();
}
return usage;
}
@Override
public void updateDemand() {
// Compute demand by iterating through apps in the queue

View File

@ -58,6 +58,7 @@ public abstract class FSQueue implements Queue, Schedulable {
private Resource fairShare = Resources.createResource(0, 0);
private Resource steadyFairShare = Resources.createResource(0, 0);
private Resource reservedResource = Resources.createResource(0, 0);
private final Resource resourceUsage = Resource.newInstance(0, 0);
private final String name;
protected final FairScheduler scheduler;
private final YarnAuthorizationProvider authorizer;
@ -479,6 +480,39 @@ public abstract class FSQueue implements Queue, Schedulable {
public void decReservedResource(String nodeLabel, Resource resourceToDec) {
}
@Override
public Resource getResourceUsage() {
return resourceUsage;
}
/**
* Increase resource usage for this queue and all parent queues.
*
* @param res the resource to increase
*/
protected void incUsedResource(Resource res) {
synchronized (resourceUsage) {
Resources.addTo(resourceUsage, res);
if (parent != null) {
parent.incUsedResource(res);
}
}
}
/**
* Decrease resource usage for this queue and all parent queues.
*
* @param res the resource to decrease
*/
protected void decUsedResource(Resource res) {
synchronized (resourceUsage) {
Resources.subtractFrom(resourceUsage, res);
if (parent != null) {
parent.decUsedResource(res);
}
}
}
@Override
public Priority getDefaultApplicationPriority() {
// TODO add implementation for FSParentQueue

View File

@ -4585,6 +4585,43 @@ public class TestFairScheduler extends FairSchedulerTestBase {
scheduler.getSchedulerApplications(), scheduler, "default");
}
@Test
public void testResourceUsageByMoveApp() throws Exception {
scheduler.init(conf);
scheduler.start();
scheduler.reinitialize(conf, resourceManager.getRMContext());
RMNode node1 = MockNodes.newNodeInfo(
1, Resources.createResource(1 * GB, 4), 1, "127.0.0.1");
NodeAddedSchedulerEvent nodeEvent1 = new NodeAddedSchedulerEvent(node1);
scheduler.handle(nodeEvent1);
ApplicationAttemptId appAttId =
createSchedulingRequest(1 * GB, 2, "parent1.queue1", "user1", 2);
scheduler.update();
NodeUpdateSchedulerEvent updateEvent = new NodeUpdateSchedulerEvent(node1);
scheduler.handle(updateEvent);
QueueManager queueMgr = scheduler.getQueueManager();
FSQueue parent1 = queueMgr.getParentQueue("parent1", true);
FSQueue parent2 = queueMgr.getParentQueue("parent2", true);
FSQueue queue2 = queueMgr.getLeafQueue("parent2.queue2", true);
FSQueue queue1 = queueMgr.getLeafQueue("parent1.queue1", true);
Assert.assertEquals(parent2.getResourceUsage().getMemorySize(), 0);
Assert.assertEquals(queue2.getResourceUsage().getMemorySize(), 0);
Assert.assertEquals(parent1.getResourceUsage().getMemorySize(), 1 * GB);
Assert.assertEquals(queue1.getResourceUsage().getMemorySize(), 1 * GB);
scheduler.moveApplication(appAttId.getApplicationId(), "parent2.queue2");
Assert.assertEquals(parent2.getResourceUsage().getMemorySize(), 1 * GB);
Assert.assertEquals(queue2.getResourceUsage().getMemorySize(), 1 * GB);
Assert.assertEquals(parent1.getResourceUsage().getMemorySize(), 0);
Assert.assertEquals(queue1.getResourceUsage().getMemorySize(), 0);
}
@Test (expected = YarnException.class)
public void testMoveWouldViolateMaxAppsConstraints() throws Exception {
scheduler.init(conf);