MAPREDUCE-3641. Making CapacityScheduler more conservative so as to assign only one off-switch container in a single scheduling iteration. Contributed by Arun C Murthy.

svn merge --ignore-ancestry -c 1232182 ../../trunk/


git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/branch-0.23@1232183 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Vinod Kumar Vavilapalli 2012-01-16 21:57:13 +00:00
parent d49f305f0b
commit 926fd5463a
8 changed files with 239 additions and 57 deletions

View File

@ -115,6 +115,10 @@ Release 0.23.1 - Unreleased
MAPREDUCE-3553. Add support for data returned when exceptions thrown from web
service apis to be in either xml or in JSON. (Thomas Graves via mahadev)
MAPREDUCE-3641. Making CapacityScheduler more conservative so as to
assign only one off-switch container in a single scheduling
iteration. (Arun C Murthy via vinodkv)
OPTIMIZATIONS
MAPREDUCE-3567. Extraneous JobConf objects in AM heap. (Vinod Kumar

View File

@ -26,8 +26,26 @@
@Private
@Evolving
public class Resources {
// Java doesn't have const :(
private static final Resource NONE = createResource(0);
private static final Resource NONE = new Resource() {
@Override
public int getMemory() {
return 0;
}
@Override
public void setMemory(int memory) {
throw new RuntimeException("NONE cannot be modified!");
}
@Override
public int compareTo(Resource o) {
return (0 - o.getMemory());
}
};
public static Resource createResource(int memory) {
Resource resource = Records.newRecord(Resource.class);
@ -36,7 +54,6 @@ public static Resource createResource(int memory) {
}
public static Resource none() {
assert NONE.getMemory() == 0 : "NONE should be empty";
return NONE;
}

View File

@ -0,0 +1,48 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity;
import org.apache.hadoop.classification.InterfaceAudience.Private;
import org.apache.hadoop.classification.InterfaceStability.Unstable;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.NodeType;
@Private
@Unstable
public class CSAssignment {
final private Resource resource;
final private NodeType type;
public CSAssignment(Resource resource, NodeType type) {
this.resource = resource;
this.type = type;
}
public Resource getResource() {
return resource;
}
public NodeType getType() {
return type;
}
@Override
public String toString() {
return resource.getMemory() + ":" + type;
}
}

View File

@ -155,9 +155,10 @@ public void submitApplication(SchedulerApp application, String user,
* Assign containers to applications in the queue or it's children (if any).
* @param clusterResource the resource of the cluster.
* @param node node on which resources are available
* @return the resource that is being assigned.
* @return the assignment
*/
public Resource assignContainers(Resource clusterResource, SchedulerNode node);
public CSAssignment assignContainers(
Resource clusterResource, SchedulerNode node);
/**
* A container assigned to the queue has completed.

View File

@ -19,7 +19,6 @@
package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Collections;
@ -35,7 +34,6 @@
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.classification.InterfaceAudience.Private;
import org.apache.hadoop.classification.InterfaceStability.Unstable;
import org.apache.hadoop.net.NetUtils;
import org.apache.hadoop.security.AccessControlException;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.security.authorize.AccessControlList;
@ -703,8 +701,11 @@ private synchronized SchedulerApp getApplication(
return applicationsMap.get(applicationAttemptId);
}
private static final CSAssignment NULL_ASSIGNMENT =
new CSAssignment(Resources.createResource(0), NodeType.NODE_LOCAL);
@Override
public synchronized Resource
public synchronized CSAssignment
assignContainers(Resource clusterResource, SchedulerNode node) {
if(LOG.isDebugEnabled()) {
@ -717,8 +718,11 @@ private synchronized SchedulerApp getApplication(
if (reservedContainer != null) {
SchedulerApp application =
getApplication(reservedContainer.getApplicationAttemptId());
return assignReservedContainer(application, node, reservedContainer,
clusterResource);
return new CSAssignment(
assignReservedContainer(application, node, reservedContainer,
clusterResource),
NodeType.NODE_LOCAL); // Don't care about locality constraints
// for reserved containers
}
// Try to assign containers to applications in order
@ -746,7 +750,7 @@ private synchronized SchedulerApp getApplication(
// Are we going over limits by allocating to this application?
// Maximum Capacity of the queue
if (!assignToQueue(clusterResource, required)) {
return Resources.none();
return NULL_ASSIGNMENT;
}
// User limits
@ -760,24 +764,23 @@ private synchronized SchedulerApp getApplication(
application.addSchedulingOpportunity(priority);
// Try to schedule
Resource assigned =
CSAssignment assignment =
assignContainersOnNode(clusterResource, node, application, priority,
null);
Resource assigned = assignment.getResource();
// Did we schedule or reserve a container?
if (Resources.greaterThan(assigned, Resources.none())) {
Resource assignedResource =
application.getResourceRequest(priority, RMNode.ANY).getCapability();
// Book-keeping
allocateResource(clusterResource,
application, assignedResource);
allocateResource(clusterResource, application, assigned);
// Reset scheduling opportunities
application.resetSchedulingOpportunities(priority);
// Done
return assignedResource;
return assignment;
} else {
// Do not assign out of order w.r.t priorities
break;
@ -792,7 +795,7 @@ private synchronized SchedulerApp getApplication(
application.showRequests();
}
return Resources.none();
return NULL_ASSIGNMENT;
}
@ -809,11 +812,12 @@ private synchronized Resource assignReservedContainer(SchedulerApp application,
container.getId(),
SchedulerUtils.UNRESERVED_CONTAINER),
RMContainerEventType.RELEASED);
return container.getResource();
return container.getResource(); // Ugh, return resource to force re-sort
}
// Try to assign if we have sufficient resources
assignContainersOnNode(clusterResource, node, application, priority, rmContainer);
assignContainersOnNode(clusterResource, node, application, priority,
rmContainer);
// Doesn't matter... since it's already charged for at time of reservation
// "re-reservation" is *free*
@ -966,7 +970,7 @@ boolean needContainers(SchedulerApp application, Priority priority, Resource req
return (((starvation + requiredContainers) - reservedContainers) > 0);
}
private Resource assignContainersOnNode(Resource clusterResource,
private CSAssignment assignContainersOnNode(Resource clusterResource,
SchedulerNode node, SchedulerApp application,
Priority priority, RMContainer reservedContainer) {
@ -977,7 +981,7 @@ private Resource assignContainersOnNode(Resource clusterResource,
assignNodeLocalContainers(clusterResource, node, application, priority,
reservedContainer);
if (Resources.greaterThan(assigned, Resources.none())) {
return assigned;
return new CSAssignment(assigned, NodeType.NODE_LOCAL);
}
// Rack-local
@ -985,12 +989,14 @@ private Resource assignContainersOnNode(Resource clusterResource,
assignRackLocalContainers(clusterResource, node, application, priority,
reservedContainer);
if (Resources.greaterThan(assigned, Resources.none())) {
return assigned;
return new CSAssignment(assigned, NodeType.RACK_LOCAL);
}
// Off-switch
return assignOffSwitchContainers(clusterResource, node, application,
priority, reservedContainer);
return new CSAssignment(
assignOffSwitchContainers(clusterResource, node, application,
priority, reservedContainer),
NodeType.OFF_SWITCH);
}
private Resource assignNodeLocalContainers(Resource clusterResource,
@ -1272,7 +1278,7 @@ synchronized void allocateResource(Resource clusterResource,
metrics.setAvailableResourcesToUser(userName, application.getHeadroom());
LOG.info(getQueueName() +
" used=" + usedResources + " numContainers=" + numContainers +
" user=" + userName + " resources=" + user.getConsumedResources());
" user=" + userName + " user-resources=" + user.getConsumedResources());
}
synchronized void releaseResource(Resource clusterResource,
@ -1290,7 +1296,7 @@ synchronized void releaseResource(Resource clusterResource,
LOG.info(getQueueName() +
" used=" + usedResources + " numContainers=" + numContainers +
" user=" + userName + " resources=" + user.getConsumedResources());
" user=" + userName + " user-resources=" + user.getConsumedResources());
}
@Override

View File

@ -48,6 +48,7 @@
import org.apache.hadoop.yarn.server.resourcemanager.resource.Resources;
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerEventType;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.NodeType;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.QueueMetrics;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApp;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNode;
@ -500,9 +501,11 @@ synchronized void setMaxCapacity(float maximumCapacity) {
}
@Override
public synchronized Resource assignContainers(
public synchronized CSAssignment assignContainers(
Resource clusterResource, SchedulerNode node) {
Resource assigned = Resources.createResource(0);
CSAssignment assignment =
new CSAssignment(Resources.createResource(0), NodeType.NODE_LOCAL);
boolean assignedOffSwitch = false;
while (canAssign(node)) {
if (LOG.isDebugEnabled()) {
@ -516,16 +519,18 @@ public synchronized Resource assignContainers(
}
// Schedule
Resource assignedToChild =
CSAssignment assignedToChild =
assignContainersToChildQueues(clusterResource, node);
assignedOffSwitch = (assignedToChild.getType() == NodeType.OFF_SWITCH);
// Done if no child-queue assigned anything
if (Resources.greaterThan(assignedToChild, Resources.none())) {
if (Resources.greaterThan(assignedToChild.getResource(),
Resources.none())) {
// Track resource utilization for the parent-queue
allocateResource(clusterResource, assignedToChild);
allocateResource(clusterResource, assignedToChild.getResource());
// Track resource utilization in this pass of the scheduler
Resources.addTo(assigned, assignedToChild);
Resources.addTo(assignment.getResource(), assignedToChild.getResource());
LOG.info("assignedContainer" +
" queue=" + getQueueName() +
@ -539,17 +544,26 @@ public synchronized Resource assignContainers(
if (LOG.isDebugEnabled()) {
LOG.debug("ParentQ=" + getQueueName()
+ " assignedSoFarInThisIteration=" + assigned
+ " assignedSoFarInThisIteration=" + assignment.getResource()
+ " utilization=" + getUtilization());
}
// Do not assign more than one container if this isn't the root queue
if (!rootQueue) {
// or if we've already assigned an off-switch container
if (rootQueue) {
if (assignedOffSwitch) {
if (LOG.isDebugEnabled()) {
LOG.debug("Not assigning more than one off-switch container," +
" assignments so far: " + assignment);
}
break;
}
} else {
break;
}
}
return assigned;
return assignment;
}
private synchronized boolean assignToQueue(Resource clusterResource) {
@ -573,9 +587,10 @@ private boolean canAssign(SchedulerNode node) {
minimumAllocation);
}
synchronized Resource assignContainersToChildQueues(Resource cluster,
synchronized CSAssignment assignContainersToChildQueues(Resource cluster,
SchedulerNode node) {
Resource assigned = Resources.createResource(0);
CSAssignment assignment =
new CSAssignment(Resources.createResource(0), NodeType.NODE_LOCAL);
printChildQueues();
@ -586,25 +601,28 @@ synchronized Resource assignContainersToChildQueues(Resource cluster,
LOG.debug("Trying to assign to queue: " + childQueue.getQueuePath()
+ " stats: " + childQueue);
}
assigned = childQueue.assignContainers(cluster, node);
assignment = childQueue.assignContainers(cluster, node);
if(LOG.isDebugEnabled()) {
LOG.debug("Assignedto queue: " + childQueue.getQueuePath()
+ " stats: " + childQueue + " --> " + assigned.getMemory());
LOG.debug("Assigned to queue: " + childQueue.getQueuePath() +
" stats: " + childQueue + " --> " +
assignment.getResource().getMemory() + ", " + assignment.getType());
}
// If we do assign, remove the queue and re-insert in-order to re-sort
if (Resources.greaterThan(assigned, Resources.none())) {
if (Resources.greaterThan(assignment.getResource(), Resources.none())) {
// Remove and re-insert to sort
iter.remove();
LOG.info("Re-sorting queues since queue: " + childQueue.getQueuePath() +
" stats: " + childQueue);
childQueues.add(childQueue);
printChildQueues();
if (LOG.isDebugEnabled()) {
printChildQueues();
}
break;
}
}
return assigned;
return assignment;
}
String getChildQueuesToPrint() {

View File

@ -811,49 +811,56 @@ public void testLocalityScheduling() throws Exception {
app_0.updateResourceRequests(app_0_requests_0);
// Start testing...
CSAssignment assignment = null;
// Start with off switch, shouldn't allocate due to delay scheduling
a.assignContainers(clusterResource, node_2);
assignment = a.assignContainers(clusterResource, node_2);
verify(app_0, never()).allocate(any(NodeType.class), eq(node_2),
any(Priority.class), any(ResourceRequest.class), any(Container.class));
assertEquals(1, app_0.getSchedulingOpportunities(priority));
assertEquals(3, app_0.getTotalRequiredResources(priority));
assertEquals(NodeType.NODE_LOCAL, assignment.getType()); // None->NODE_LOCAL
// Another off switch, shouldn't allocate due to delay scheduling
a.assignContainers(clusterResource, node_2);
assignment = a.assignContainers(clusterResource, node_2);
verify(app_0, never()).allocate(any(NodeType.class), eq(node_2),
any(Priority.class), any(ResourceRequest.class), any(Container.class));
assertEquals(2, app_0.getSchedulingOpportunities(priority));
assertEquals(3, app_0.getTotalRequiredResources(priority));
assertEquals(NodeType.NODE_LOCAL, assignment.getType()); // None->NODE_LOCAL
// Another off switch, shouldn't allocate due to delay scheduling
a.assignContainers(clusterResource, node_2);
assignment = a.assignContainers(clusterResource, node_2);
verify(app_0, never()).allocate(any(NodeType.class), eq(node_2),
any(Priority.class), any(ResourceRequest.class), any(Container.class));
assertEquals(3, app_0.getSchedulingOpportunities(priority));
assertEquals(3, app_0.getTotalRequiredResources(priority));
assertEquals(NodeType.NODE_LOCAL, assignment.getType()); // None->NODE_LOCAL
// Another off switch, now we should allocate
// since missedOpportunities=3 and reqdContainers=3
a.assignContainers(clusterResource, node_2);
assignment = a.assignContainers(clusterResource, node_2);
verify(app_0).allocate(eq(NodeType.OFF_SWITCH), eq(node_2),
any(Priority.class), any(ResourceRequest.class), any(Container.class));
assertEquals(0, app_0.getSchedulingOpportunities(priority)); // should reset
assertEquals(2, app_0.getTotalRequiredResources(priority));
assertEquals(NodeType.OFF_SWITCH, assignment.getType());
// NODE_LOCAL - node_0
a.assignContainers(clusterResource, node_0);
assignment = a.assignContainers(clusterResource, node_0);
verify(app_0).allocate(eq(NodeType.NODE_LOCAL), eq(node_0),
any(Priority.class), any(ResourceRequest.class), any(Container.class));
assertEquals(0, app_0.getSchedulingOpportunities(priority)); // should reset
assertEquals(1, app_0.getTotalRequiredResources(priority));
assertEquals(NodeType.NODE_LOCAL, assignment.getType());
// NODE_LOCAL - node_1
a.assignContainers(clusterResource, node_1);
assignment = a.assignContainers(clusterResource, node_1);
verify(app_0).allocate(eq(NodeType.NODE_LOCAL), eq(node_1),
any(Priority.class), any(ResourceRequest.class), any(Container.class));
assertEquals(0, app_0.getSchedulingOpportunities(priority)); // should reset
assertEquals(0, app_0.getTotalRequiredResources(priority));
assertEquals(NodeType.NODE_LOCAL, assignment.getType());
// Add 1 more request to check for RACK_LOCAL
app_0_requests_0.clear();
@ -872,11 +879,12 @@ public void testLocalityScheduling() throws Exception {
String host_3 = "host_3"; // on rack_1
SchedulerNode node_3 = TestUtils.getMockNode(host_3, rack_1, 0, 8*GB);
a.assignContainers(clusterResource, node_3);
assignment = a.assignContainers(clusterResource, node_3);
verify(app_0).allocate(eq(NodeType.RACK_LOCAL), eq(node_3),
any(Priority.class), any(ResourceRequest.class), any(Container.class));
assertEquals(0, app_0.getSchedulingOpportunities(priority)); // should reset
assertEquals(0, app_0.getTotalRequiredResources(priority));
assertEquals(NodeType.RACK_LOCAL, assignment.getType());
}
@Test

View File

@ -29,6 +29,7 @@
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
import org.apache.hadoop.yarn.server.resourcemanager.resource.Resources;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.NodeType;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApp;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNode;
@ -92,11 +93,18 @@ private SchedulerApp getMockApplication(int appId, String user) {
private void stubQueueAllocation(final CSQueue queue,
final Resource clusterResource, final SchedulerNode node,
final int allocation) {
stubQueueAllocation(queue, clusterResource, node, allocation,
NodeType.NODE_LOCAL);
}
private void stubQueueAllocation(final CSQueue queue,
final Resource clusterResource, final SchedulerNode node,
final int allocation, final NodeType type) {
// Simulate the queue allocation
doAnswer(new Answer<Resource>() {
doAnswer(new Answer<CSAssignment>() {
@Override
public Resource answer(InvocationOnMock invocation) throws Throwable {
public CSAssignment answer(InvocationOnMock invocation) throws Throwable {
try {
throw new Exception();
} catch (Exception e) {
@ -115,8 +123,8 @@ public Resource answer(InvocationOnMock invocation) throws Throwable {
// Next call - nothing
if (allocation > 0) {
doReturn(Resources.none()).when(queue).assignContainers(
eq(clusterResource), eq(node));
doReturn(new CSAssignment(Resources.none(), type)).
when(queue).assignContainers(eq(clusterResource), eq(node));
// Mock the node's resource availability
Resource available = node.getAvailableResource();
@ -124,7 +132,7 @@ public Resource answer(InvocationOnMock invocation) throws Throwable {
when(node).getAvailableResource();
}
return allocatedResource;
return new CSAssignment(allocatedResource, type);
}
}).
when(queue).assignContainers(eq(clusterResource), eq(node));
@ -401,6 +409,78 @@ public void testMultiLevelQueues() throws Exception {
}
@Test
public void testOffSwitchScheduling() throws Exception {
// Setup queue configs
setupSingleLevelQueues(csConf);
Map<String, CSQueue> queues = new HashMap<String, CSQueue>();
CSQueue root =
CapacityScheduler.parseQueue(csContext, csConf, null,
CapacitySchedulerConfiguration.ROOT, queues, queues,
CapacityScheduler.queueComparator,
CapacityScheduler.applicationComparator,
TestUtils.spyHook);
// Setup some nodes
final int memoryPerNode = 10;
final int numNodes = 2;
SchedulerNode node_0 =
TestUtils.getMockNode("host_0", DEFAULT_RACK, 0, memoryPerNode*GB);
SchedulerNode node_1 =
TestUtils.getMockNode("host_1", DEFAULT_RACK, 0, memoryPerNode*GB);
final Resource clusterResource =
Resources.createResource(numNodes * (memoryPerNode*GB));
when(csContext.getNumClusterNodes()).thenReturn(numNodes);
// Start testing
LeafQueue a = (LeafQueue)queues.get(A);
LeafQueue b = (LeafQueue)queues.get(B);
final float delta = 0.0001f;
// Simulate B returning a container on node_0
stubQueueAllocation(a, clusterResource, node_0, 0*GB, NodeType.OFF_SWITCH);
stubQueueAllocation(b, clusterResource, node_0, 1*GB, NodeType.OFF_SWITCH);
root.assignContainers(clusterResource, node_0);
assertEquals(0.0f, a.getUtilization(), delta);
assertEquals(computeQueueUtilization(b, 1*GB, clusterResource),
b.getUtilization(), delta);
// Now, A should get the scheduling opportunity since A=0G/6G, B=1G/14G
// also, B gets a scheduling opportunity since A allocates RACK_LOCAL
stubQueueAllocation(a, clusterResource, node_1, 2*GB, NodeType.RACK_LOCAL);
stubQueueAllocation(b, clusterResource, node_1, 1*GB, NodeType.OFF_SWITCH);
root.assignContainers(clusterResource, node_1);
InOrder allocationOrder = inOrder(a, b);
allocationOrder.verify(a).assignContainers(eq(clusterResource),
any(SchedulerNode.class));
allocationOrder.verify(b).assignContainers(eq(clusterResource),
any(SchedulerNode.class));
assertEquals(computeQueueUtilization(a, 2*GB, clusterResource),
a.getUtilization(), delta);
assertEquals(computeQueueUtilization(b, 2*GB, clusterResource),
b.getUtilization(), delta);
// Now, B should get the scheduling opportunity
// since A has 2/6G while B has 2/14G,
// However, since B returns off-switch, A won't get an opportunity
stubQueueAllocation(a, clusterResource, node_0, 1*GB, NodeType.NODE_LOCAL);
stubQueueAllocation(b, clusterResource, node_0, 2*GB, NodeType.OFF_SWITCH);
root.assignContainers(clusterResource, node_0);
allocationOrder = inOrder(b, a);
allocationOrder.verify(b).assignContainers(eq(clusterResource),
any(SchedulerNode.class));
allocationOrder.verify(a).assignContainers(eq(clusterResource),
any(SchedulerNode.class));
assertEquals(computeQueueUtilization(a, 2*GB, clusterResource),
a.getUtilization(), delta);
assertEquals(computeQueueUtilization(b, 4*GB, clusterResource),
b.getUtilization(), delta);
}
@After
public void tearDown() throws Exception {
}