YARN-4090. Make Collections.sort() more efficient by caching resource usage. (Contributed by Yufei Gu, Shilong Zhang and Xianyin Xin)

This commit is contained in:
Yufei Gu 2017-10-20 01:32:20 -07:00
parent 4afd308b62
commit 1f4cdf1068
5 changed files with 93 additions and 32 deletions

View File

@ -48,6 +48,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerEven
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerEventType;
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerFinishedEvent;
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerImpl;
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerState;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ActiveUsersManager;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.NodeType;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.QueueMetrics;
@ -167,6 +168,7 @@ public class FSAppAttempt extends SchedulerApplicationAttempt
rmContainer.getNodeLabelExpression(),
getUser(), 1, containerResource);
this.attemptResourceUsage.decUsed(containerResource);
getQueue().decUsedResource(containerResource);
// Clear resource utilization metrics cache.
lastMemoryAggregateAllocationUpdateTime = -1;
@ -461,6 +463,7 @@ public class FSAppAttempt extends SchedulerApplicationAttempt
List<ResourceRequest> resourceRequestList = appSchedulingInfo.allocate(
type, node, schedulerKey, container);
this.attemptResourceUsage.incUsed(container.getResource());
getQueue().incUsedResource(container.getResource());
// Update resource requests related to "request" and store in RMContainer
((RMContainerImpl) rmContainer).setResourceRequests(resourceRequestList);
@ -644,6 +647,22 @@ public class FSAppAttempt extends SchedulerApplicationAttempt
schedulerKey.getAllocationRequestId());
}
@Override
public synchronized void recoverContainer(SchedulerNode node,
RMContainer rmContainer) {
try {
writeLock.lock();
super.recoverContainer(node, rmContainer);
if (!rmContainer.getState().equals(RMContainerState.COMPLETED)) {
getQueue().incUsedResource(rmContainer.getContainer().getResource());
}
} finally {
writeLock.unlock();
}
}
/**
* Reserve a spot for {@code container} on this {@code node}. If
* the container is {@code alreadyReserved} on the node, simply

View File

@ -89,6 +89,7 @@ public class FSLeafQueue extends FSQueue {
} else {
nonRunnableApps.add(app);
}
incUsedResource(app.getResourceUsage());
} finally {
writeLock.unlock();
}
@ -123,6 +124,7 @@ public class FSLeafQueue extends FSQueue {
getMetrics().setAMResourceUsage(amResourceUsage);
}
decUsedResource(app.getResourceUsage());
return runnable;
}
@ -292,23 +294,6 @@ public class FSLeafQueue extends FSQueue {
return demand;
}
@Override
public Resource getResourceUsage() {
Resource usage = Resources.createResource(0);
readLock.lock();
try {
for (FSAppAttempt app : runnableApps) {
Resources.addTo(usage, app.getResourceUsage());
}
for (FSAppAttempt app : nonRunnableApps) {
Resources.addTo(usage, app.getResourceUsage());
}
} finally {
readLock.unlock();
}
return usage;
}
Resource getAmResourceUsage() {
return amResourceUsage;
}

View File

@ -118,20 +118,6 @@ public class FSParentQueue extends FSQueue {
}
}
@Override
public Resource getResourceUsage() {
Resource usage = Resources.createResource(0);
readLock.lock();
try {
for (FSQueue child : childQueues) {
Resources.addTo(usage, child.getResourceUsage());
}
} finally {
readLock.unlock();
}
return usage;
}
@Override
public void updateDemand() {
// Compute demand by iterating through apps in the queue

View File

@ -57,6 +57,7 @@ public abstract class FSQueue implements Queue, Schedulable {
private Resource fairShare = Resources.createResource(0, 0);
private Resource steadyFairShare = Resources.createResource(0, 0);
private Resource reservedResource = Resources.createResource(0, 0);
private final Resource resourceUsage = Resource.newInstance(0, 0);
private final String name;
protected final FairScheduler scheduler;
private final YarnAuthorizationProvider authorizer;
@ -478,6 +479,39 @@ public abstract class FSQueue implements Queue, Schedulable {
public void decReservedResource(String nodeLabel, Resource resourceToDec) {
}
@Override
public Resource getResourceUsage() {
return resourceUsage;
}
/**
* Increase resource usage for this queue and all parent queues.
*
* @param res the resource to increase
*/
protected void incUsedResource(Resource res) {
synchronized (resourceUsage) {
Resources.addTo(resourceUsage, res);
if (parent != null) {
parent.incUsedResource(res);
}
}
}
/**
* Decrease resource usage for this queue and all parent queues.
*
* @param res the resource to decrease
*/
protected void decUsedResource(Resource res) {
synchronized (resourceUsage) {
Resources.subtractFrom(resourceUsage, res);
if (parent != null) {
parent.decUsedResource(res);
}
}
}
@Override
public Priority getDefaultApplicationPriority() {
// TODO add implementation for FSParentQueue

View File

@ -4585,6 +4585,43 @@ public class TestFairScheduler extends FairSchedulerTestBase {
TestSchedulerUtils.verifyAppAddedAndRemovedFromScheduler(
scheduler.getSchedulerApplications(), scheduler, "default");
}
@Test
public void testResourceUsageByMoveApp() throws Exception {
scheduler.init(conf);
scheduler.start();
scheduler.reinitialize(conf, resourceManager.getRMContext());
RMNode node1 = MockNodes.newNodeInfo(
1, Resources.createResource(1 * GB, 4), 1, "127.0.0.1");
NodeAddedSchedulerEvent nodeEvent1 = new NodeAddedSchedulerEvent(node1);
scheduler.handle(nodeEvent1);
ApplicationAttemptId appAttId =
createSchedulingRequest(1 * GB, 2, "parent1.queue1", "user1", 2);
scheduler.update();
NodeUpdateSchedulerEvent updateEvent = new NodeUpdateSchedulerEvent(node1);
scheduler.handle(updateEvent);
QueueManager queueMgr = scheduler.getQueueManager();
FSQueue parent1 = queueMgr.getParentQueue("parent1", true);
FSQueue parent2 = queueMgr.getParentQueue("parent2", true);
FSQueue queue2 = queueMgr.getLeafQueue("parent2.queue2", true);
FSQueue queue1 = queueMgr.getLeafQueue("parent1.queue1", true);
Assert.assertEquals(parent2.getResourceUsage().getMemorySize(), 0);
Assert.assertEquals(queue2.getResourceUsage().getMemorySize(), 0);
Assert.assertEquals(parent1.getResourceUsage().getMemorySize(), 1 * GB);
Assert.assertEquals(queue1.getResourceUsage().getMemorySize(), 1 * GB);
scheduler.moveApplication(appAttId.getApplicationId(), "parent2.queue2");
Assert.assertEquals(parent2.getResourceUsage().getMemorySize(), 1 * GB);
Assert.assertEquals(queue2.getResourceUsage().getMemorySize(), 1 * GB);
Assert.assertEquals(parent1.getResourceUsage().getMemorySize(), 0);
Assert.assertEquals(queue1.getResourceUsage().getMemorySize(), 0);
}
@Test (expected = YarnException.class)
public void testMoveWouldViolateMaxAppsConstraints() throws Exception {
@ -4598,7 +4635,7 @@ public class TestFairScheduler extends FairSchedulerTestBase {
ApplicationAttemptId appAttId =
createSchedulingRequest(1024, 1, "queue1", "user1", 3);
scheduler.moveApplication(appAttId.getApplicationId(), "queue2");
}