svn merge -c 1310056 from trunk. FIXES: MAPREDUCE-4073. CS assigns multiple off-switch containers when using multi-level-queues (Siddharth Seth via bobby)

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/branch-2@1310057 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Robert Joseph Evans 2012-04-05 20:29:01 +00:00
parent 0d388cd1f4
commit 401d4b3584
4 changed files with 78 additions and 9 deletions

View File

@ -162,6 +162,9 @@ Release 0.23.3 - UNRELEASED
MAPREDUCE-3621. TestDBJob and TestDataDrivenDBInputFormat ant tests fail MAPREDUCE-3621. TestDBJob and TestDataDrivenDBInputFormat ant tests fail
(Ravi Prakash via tgraves) (Ravi Prakash via tgraves)
MAPREDUCE-4073. CS assigns multiple off-switch containers when using
multi-level-queues (Siddharth Seth via bobby)
Release 0.23.2 - UNRELEASED Release 0.23.2 - UNRELEASED
INCOMPATIBLE CHANGES INCOMPATIBLE CHANGES

View File

@ -26,7 +26,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.NodeType;
@Unstable @Unstable
public class CSAssignment { public class CSAssignment {
final private Resource resource; final private Resource resource;
final private NodeType type; private NodeType type;
public CSAssignment(Resource resource, NodeType type) { public CSAssignment(Resource resource, NodeType type) {
this.resource = resource; this.resource = resource;
@ -41,6 +41,10 @@ public class CSAssignment {
return type; return type;
} }
public void setType(NodeType type) {
this.type = type;
}
@Override @Override
public String toString() { public String toString() {
return resource.getMemory() + ":" + type; return resource.getMemory() + ":" + type;

View File

@ -519,7 +519,6 @@ public class ParentQueue implements CSQueue {
Resource clusterResource, SchedulerNode node) { Resource clusterResource, SchedulerNode node) {
CSAssignment assignment = CSAssignment assignment =
new CSAssignment(Resources.createResource(0), NodeType.NODE_LOCAL); new CSAssignment(Resources.createResource(0), NodeType.NODE_LOCAL);
boolean assignedOffSwitch = false;
while (canAssign(node)) { while (canAssign(node)) {
if (LOG.isDebugEnabled()) { if (LOG.isDebugEnabled()) {
@ -535,7 +534,7 @@ public class ParentQueue implements CSQueue {
// Schedule // Schedule
CSAssignment assignedToChild = CSAssignment assignedToChild =
assignContainersToChildQueues(clusterResource, node); assignContainersToChildQueues(clusterResource, node);
assignedOffSwitch = (assignedToChild.getType() == NodeType.OFF_SWITCH); assignment.setType(assignedToChild.getType());
// Done if no child-queue assigned anything // Done if no child-queue assigned anything
if (Resources.greaterThan(assignedToChild.getResource(), if (Resources.greaterThan(assignedToChild.getResource(),
@ -566,15 +565,13 @@ public class ParentQueue implements CSQueue {
// Do not assign more than one container if this isn't the root queue // Do not assign more than one container if this isn't the root queue
// or if we've already assigned an off-switch container // or if we've already assigned an off-switch container
if (rootQueue) { if (!rootQueue || assignment.getType() == NodeType.OFF_SWITCH) {
if (assignedOffSwitch) { if (LOG.isDebugEnabled()) {
if (LOG.isDebugEnabled()) { if (rootQueue && assignment.getType() == NodeType.OFF_SWITCH) {
LOG.debug("Not assigning more than one off-switch container," + LOG.debug("Not assigning more than one off-switch container," +
" assignments so far: " + assignment); " assignments so far: " + assignment);
} }
break;
} }
} else {
break; break;
} }
} }

View File

@ -499,6 +499,71 @@ public class TestParentQueue {
} }
@Test
public void testOffSwitchSchedulingMultiLevelQueues() throws Exception {
// Setup queue configs
setupMultiLevelQueues(csConf);
//B3
Map<String, CSQueue> queues = new HashMap<String, CSQueue>();
CSQueue root =
CapacityScheduler.parseQueue(csContext, csConf, null,
CapacitySchedulerConfiguration.ROOT, queues, queues,
CapacityScheduler.queueComparator,
CapacityScheduler.applicationComparator,
TestUtils.spyHook);
// Setup some nodes
final int memoryPerNode = 10;
final int numNodes = 2;
SchedulerNode node_0 =
TestUtils.getMockNode("host_0", DEFAULT_RACK, 0, memoryPerNode*GB);
SchedulerNode node_1 =
TestUtils.getMockNode("host_1", DEFAULT_RACK, 0, memoryPerNode*GB);
final Resource clusterResource =
Resources.createResource(numNodes * (memoryPerNode*GB));
when(csContext.getNumClusterNodes()).thenReturn(numNodes);
// Start testing
LeafQueue b3 = (LeafQueue)queues.get(B3);
LeafQueue b2 = (LeafQueue)queues.get(B2);
// Simulate B3 returning a container on node_0
stubQueueAllocation(b2, clusterResource, node_0, 0*GB, NodeType.OFF_SWITCH);
stubQueueAllocation(b3, clusterResource, node_0, 1*GB, NodeType.OFF_SWITCH);
root.assignContainers(clusterResource, node_0);
verifyQueueMetrics(b2, 0*GB, clusterResource);
verifyQueueMetrics(b3, 1*GB, clusterResource);
// Now, B2 should get the scheduling opportunity since B2=0G/2G, B3=1G/7G
// also, B3 gets a scheduling opportunity since B2 allocates RACK_LOCAL
stubQueueAllocation(b2, clusterResource, node_1, 1*GB, NodeType.RACK_LOCAL);
stubQueueAllocation(b3, clusterResource, node_1, 1*GB, NodeType.OFF_SWITCH);
root.assignContainers(clusterResource, node_1);
InOrder allocationOrder = inOrder(b2, b3);
allocationOrder.verify(b2).assignContainers(eq(clusterResource),
any(SchedulerNode.class));
allocationOrder.verify(b3).assignContainers(eq(clusterResource),
any(SchedulerNode.class));
verifyQueueMetrics(b2, 1*GB, clusterResource);
verifyQueueMetrics(b3, 2*GB, clusterResource);
// Now, B3 should get the scheduling opportunity
// since B2 has 1/2G while B3 has 2/7G,
// However, since B3 returns off-switch, B2 won't get an opportunity
stubQueueAllocation(b2, clusterResource, node_0, 1*GB, NodeType.NODE_LOCAL);
stubQueueAllocation(b3, clusterResource, node_0, 1*GB, NodeType.OFF_SWITCH);
root.assignContainers(clusterResource, node_0);
allocationOrder = inOrder(b3, b2);
allocationOrder.verify(b3).assignContainers(eq(clusterResource),
any(SchedulerNode.class));
allocationOrder.verify(b2).assignContainers(eq(clusterResource),
any(SchedulerNode.class));
verifyQueueMetrics(b2, 1*GB, clusterResource);
verifyQueueMetrics(b3, 3*GB, clusterResource);
}
public boolean hasQueueACL(List<QueueUserACLInfo> aclInfos, QueueACL acl, String qName) { public boolean hasQueueACL(List<QueueUserACLInfo> aclInfos, QueueACL acl, String qName) {
for (QueueUserACLInfo aclInfo : aclInfos) { for (QueueUserACLInfo aclInfo : aclInfos) {