MAPREDUCE-4073. CS assigns multiple off-switch containers when using multi-level-queues (Siddharth Seth via bobby)
git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1310056 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
98df316058
commit
c76b264196
|
@ -261,6 +261,9 @@ Release 0.23.3 - UNRELEASED
|
||||||
MAPREDUCE-3621. TestDBJob and TestDataDrivenDBInputFormat ant tests fail
|
MAPREDUCE-3621. TestDBJob and TestDataDrivenDBInputFormat ant tests fail
|
||||||
(Ravi Prakash via tgraves)
|
(Ravi Prakash via tgraves)
|
||||||
|
|
||||||
|
MAPREDUCE-4073. CS assigns multiple off-switch containers when using
|
||||||
|
multi-level-queues (Siddharth Seth via bobby)
|
||||||
|
|
||||||
Release 0.23.2 - UNRELEASED
|
Release 0.23.2 - UNRELEASED
|
||||||
|
|
||||||
INCOMPATIBLE CHANGES
|
INCOMPATIBLE CHANGES
|
||||||
|
|
|
@ -26,7 +26,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.NodeType;
|
||||||
@Unstable
|
@Unstable
|
||||||
public class CSAssignment {
|
public class CSAssignment {
|
||||||
final private Resource resource;
|
final private Resource resource;
|
||||||
final private NodeType type;
|
private NodeType type;
|
||||||
|
|
||||||
public CSAssignment(Resource resource, NodeType type) {
|
public CSAssignment(Resource resource, NodeType type) {
|
||||||
this.resource = resource;
|
this.resource = resource;
|
||||||
|
@ -41,6 +41,10 @@ public class CSAssignment {
|
||||||
return type;
|
return type;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public void setType(NodeType type) {
|
||||||
|
this.type = type;
|
||||||
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public String toString() {
|
public String toString() {
|
||||||
return resource.getMemory() + ":" + type;
|
return resource.getMemory() + ":" + type;
|
||||||
|
|
|
@ -519,7 +519,6 @@ public class ParentQueue implements CSQueue {
|
||||||
Resource clusterResource, SchedulerNode node) {
|
Resource clusterResource, SchedulerNode node) {
|
||||||
CSAssignment assignment =
|
CSAssignment assignment =
|
||||||
new CSAssignment(Resources.createResource(0), NodeType.NODE_LOCAL);
|
new CSAssignment(Resources.createResource(0), NodeType.NODE_LOCAL);
|
||||||
boolean assignedOffSwitch = false;
|
|
||||||
|
|
||||||
while (canAssign(node)) {
|
while (canAssign(node)) {
|
||||||
if (LOG.isDebugEnabled()) {
|
if (LOG.isDebugEnabled()) {
|
||||||
|
@ -535,7 +534,7 @@ public class ParentQueue implements CSQueue {
|
||||||
// Schedule
|
// Schedule
|
||||||
CSAssignment assignedToChild =
|
CSAssignment assignedToChild =
|
||||||
assignContainersToChildQueues(clusterResource, node);
|
assignContainersToChildQueues(clusterResource, node);
|
||||||
assignedOffSwitch = (assignedToChild.getType() == NodeType.OFF_SWITCH);
|
assignment.setType(assignedToChild.getType());
|
||||||
|
|
||||||
// Done if no child-queue assigned anything
|
// Done if no child-queue assigned anything
|
||||||
if (Resources.greaterThan(assignedToChild.getResource(),
|
if (Resources.greaterThan(assignedToChild.getResource(),
|
||||||
|
@ -566,15 +565,13 @@ public class ParentQueue implements CSQueue {
|
||||||
|
|
||||||
// Do not assign more than one container if this isn't the root queue
|
// Do not assign more than one container if this isn't the root queue
|
||||||
// or if we've already assigned an off-switch container
|
// or if we've already assigned an off-switch container
|
||||||
if (rootQueue) {
|
if (!rootQueue || assignment.getType() == NodeType.OFF_SWITCH) {
|
||||||
if (assignedOffSwitch) {
|
|
||||||
if (LOG.isDebugEnabled()) {
|
if (LOG.isDebugEnabled()) {
|
||||||
|
if (rootQueue && assignment.getType() == NodeType.OFF_SWITCH) {
|
||||||
LOG.debug("Not assigning more than one off-switch container," +
|
LOG.debug("Not assigning more than one off-switch container," +
|
||||||
" assignments so far: " + assignment);
|
" assignments so far: " + assignment);
|
||||||
}
|
}
|
||||||
break;
|
|
||||||
}
|
}
|
||||||
} else {
|
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -499,6 +499,71 @@ public class TestParentQueue {
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testOffSwitchSchedulingMultiLevelQueues() throws Exception {
|
||||||
|
// Setup queue configs
|
||||||
|
setupMultiLevelQueues(csConf);
|
||||||
|
//B3
|
||||||
|
Map<String, CSQueue> queues = new HashMap<String, CSQueue>();
|
||||||
|
CSQueue root =
|
||||||
|
CapacityScheduler.parseQueue(csContext, csConf, null,
|
||||||
|
CapacitySchedulerConfiguration.ROOT, queues, queues,
|
||||||
|
CapacityScheduler.queueComparator,
|
||||||
|
CapacityScheduler.applicationComparator,
|
||||||
|
TestUtils.spyHook);
|
||||||
|
|
||||||
|
// Setup some nodes
|
||||||
|
final int memoryPerNode = 10;
|
||||||
|
final int numNodes = 2;
|
||||||
|
|
||||||
|
SchedulerNode node_0 =
|
||||||
|
TestUtils.getMockNode("host_0", DEFAULT_RACK, 0, memoryPerNode*GB);
|
||||||
|
SchedulerNode node_1 =
|
||||||
|
TestUtils.getMockNode("host_1", DEFAULT_RACK, 0, memoryPerNode*GB);
|
||||||
|
|
||||||
|
final Resource clusterResource =
|
||||||
|
Resources.createResource(numNodes * (memoryPerNode*GB));
|
||||||
|
when(csContext.getNumClusterNodes()).thenReturn(numNodes);
|
||||||
|
|
||||||
|
// Start testing
|
||||||
|
LeafQueue b3 = (LeafQueue)queues.get(B3);
|
||||||
|
LeafQueue b2 = (LeafQueue)queues.get(B2);
|
||||||
|
|
||||||
|
// Simulate B3 returning a container on node_0
|
||||||
|
stubQueueAllocation(b2, clusterResource, node_0, 0*GB, NodeType.OFF_SWITCH);
|
||||||
|
stubQueueAllocation(b3, clusterResource, node_0, 1*GB, NodeType.OFF_SWITCH);
|
||||||
|
root.assignContainers(clusterResource, node_0);
|
||||||
|
verifyQueueMetrics(b2, 0*GB, clusterResource);
|
||||||
|
verifyQueueMetrics(b3, 1*GB, clusterResource);
|
||||||
|
|
||||||
|
// Now, B2 should get the scheduling opportunity since B2=0G/2G, B3=1G/7G
|
||||||
|
// also, B3 gets a scheduling opportunity since B2 allocates RACK_LOCAL
|
||||||
|
stubQueueAllocation(b2, clusterResource, node_1, 1*GB, NodeType.RACK_LOCAL);
|
||||||
|
stubQueueAllocation(b3, clusterResource, node_1, 1*GB, NodeType.OFF_SWITCH);
|
||||||
|
root.assignContainers(clusterResource, node_1);
|
||||||
|
InOrder allocationOrder = inOrder(b2, b3);
|
||||||
|
allocationOrder.verify(b2).assignContainers(eq(clusterResource),
|
||||||
|
any(SchedulerNode.class));
|
||||||
|
allocationOrder.verify(b3).assignContainers(eq(clusterResource),
|
||||||
|
any(SchedulerNode.class));
|
||||||
|
verifyQueueMetrics(b2, 1*GB, clusterResource);
|
||||||
|
verifyQueueMetrics(b3, 2*GB, clusterResource);
|
||||||
|
|
||||||
|
// Now, B3 should get the scheduling opportunity
|
||||||
|
// since B2 has 1/2G while B3 has 2/7G,
|
||||||
|
// However, since B3 returns off-switch, B2 won't get an opportunity
|
||||||
|
stubQueueAllocation(b2, clusterResource, node_0, 1*GB, NodeType.NODE_LOCAL);
|
||||||
|
stubQueueAllocation(b3, clusterResource, node_0, 1*GB, NodeType.OFF_SWITCH);
|
||||||
|
root.assignContainers(clusterResource, node_0);
|
||||||
|
allocationOrder = inOrder(b3, b2);
|
||||||
|
allocationOrder.verify(b3).assignContainers(eq(clusterResource),
|
||||||
|
any(SchedulerNode.class));
|
||||||
|
allocationOrder.verify(b2).assignContainers(eq(clusterResource),
|
||||||
|
any(SchedulerNode.class));
|
||||||
|
verifyQueueMetrics(b2, 1*GB, clusterResource);
|
||||||
|
verifyQueueMetrics(b3, 3*GB, clusterResource);
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
public boolean hasQueueACL(List<QueueUserACLInfo> aclInfos, QueueACL acl, String qName) {
|
public boolean hasQueueACL(List<QueueUserACLInfo> aclInfos, QueueACL acl, String qName) {
|
||||||
for (QueueUserACLInfo aclInfo : aclInfos) {
|
for (QueueUserACLInfo aclInfo : aclInfos) {
|
||||||
|
|
Loading…
Reference in New Issue