diff --git a/hadoop-mapreduce-project/CHANGES.txt b/hadoop-mapreduce-project/CHANGES.txt index e13e1f2d419..db7231e280d 100644 --- a/hadoop-mapreduce-project/CHANGES.txt +++ b/hadoop-mapreduce-project/CHANGES.txt @@ -115,6 +115,10 @@ Release 0.23.1 - Unreleased MAPREDUCE-3553. Add support for data returned when exceptions thrown from web service apis to be in either xml or in JSON. (Thomas Graves via mahadev) + MAPREDUCE-3641. Making CapacityScheduler more conservative so as to + assign only one off-switch container in a single scheduling + iteration. (Arun C Murthy via vinodkv) + OPTIMIZATIONS MAPREDUCE-3567. Extraneous JobConf objects in AM heap. (Vinod Kumar diff --git a/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/resource/Resources.java b/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/resource/Resources.java index e2b062cb847..428246d485d 100644 --- a/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/resource/Resources.java +++ b/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/resource/Resources.java @@ -26,8 +26,26 @@ @Private @Evolving public class Resources { + // Java doesn't have const :( - private static final Resource NONE = createResource(0); + private static final Resource NONE = new Resource() { + + @Override + public int getMemory() { + return 0; + } + + @Override + public void setMemory(int memory) { + throw new RuntimeException("NONE cannot be modified!"); + } + + @Override + public int compareTo(Resource o) { + return (0 - o.getMemory()); + } + + }; public static Resource createResource(int memory) { Resource resource = Records.newRecord(Resource.class); @@ -36,7 +54,6 @@ public static Resource createResource(int memory) { } public static Resource none() { - assert NONE.getMemory() == 0 : "NONE should be empty"; return NONE; } diff --git a/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CSAssignment.java b/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CSAssignment.java new file mode 100644 index 00000000000..270f97d8aa0 --- /dev/null +++ b/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CSAssignment.java @@ -0,0 +1,48 @@ +/** +* Licensed to the Apache Software Foundation (ASF) under one +* or more contributor license agreements. See the NOTICE file +* distributed with this work for additional information +* regarding copyright ownership. The ASF licenses this file +* to you under the Apache License, Version 2.0 (the +* "License"); you may not use this file except in compliance +* with the License. You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, software +* distributed under the License is distributed on an "AS IS" BASIS, +* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +* See the License for the specific language governing permissions and +* limitations under the License. +*/ +package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity; + +import org.apache.hadoop.classification.InterfaceAudience.Private; +import org.apache.hadoop.classification.InterfaceStability.Unstable; +import org.apache.hadoop.yarn.api.records.Resource; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.NodeType; + +@Private +@Unstable +public class CSAssignment { + final private Resource resource; + final private NodeType type; + + public CSAssignment(Resource resource, NodeType type) { + this.resource = resource; + this.type = type; + } + + public Resource getResource() { + return resource; + } + + public NodeType getType() { + return type; + } + + @Override + public String toString() { + return resource.getMemory() + ":" + type; + } +} \ No newline at end of file diff --git a/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CSQueue.java b/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CSQueue.java index f2c9533a228..01532de9911 100644 --- a/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CSQueue.java +++ b/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CSQueue.java @@ -155,9 +155,10 @@ public void submitApplication(SchedulerApp application, String user, * Assign containers to applications in the queue or it's children (if any). * @param clusterResource the resource of the cluster. * @param node node on which resources are available - * @return the resource that is being assigned. + * @return the assignment */ - public Resource assignContainers(Resource clusterResource, SchedulerNode node); + public CSAssignment assignContainers( + Resource clusterResource, SchedulerNode node); /** * A container assigned to the queue has completed. diff --git a/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java b/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java index 8716294ef83..5f1314113ee 100644 --- a/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java +++ b/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java @@ -19,7 +19,6 @@ package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity; import java.io.IOException; -import java.net.InetSocketAddress; import java.nio.ByteBuffer; import java.util.ArrayList; import java.util.Collections; @@ -35,7 +34,6 @@ import org.apache.commons.logging.LogFactory; import org.apache.hadoop.classification.InterfaceAudience.Private; import org.apache.hadoop.classification.InterfaceStability.Unstable; -import org.apache.hadoop.net.NetUtils; import org.apache.hadoop.security.AccessControlException; import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.security.authorize.AccessControlList; @@ -703,8 +701,11 @@ private synchronized SchedulerApp getApplication( return applicationsMap.get(applicationAttemptId); } + private static final CSAssignment NULL_ASSIGNMENT = + new CSAssignment(Resources.createResource(0), NodeType.NODE_LOCAL); + @Override - public synchronized Resource + public synchronized CSAssignment assignContainers(Resource clusterResource, SchedulerNode node) { if(LOG.isDebugEnabled()) { @@ -717,8 +718,11 @@ private synchronized SchedulerApp getApplication( if (reservedContainer != null) { SchedulerApp application = getApplication(reservedContainer.getApplicationAttemptId()); - return assignReservedContainer(application, node, reservedContainer, - clusterResource); + return new CSAssignment( + assignReservedContainer(application, node, reservedContainer, + clusterResource), + NodeType.NODE_LOCAL); // Don't care about locality constraints + // for reserved containers } // Try to assign containers to applications in order @@ -746,7 +750,7 @@ private synchronized SchedulerApp getApplication( // Are we going over limits by allocating to this application? // Maximum Capacity of the queue if (!assignToQueue(clusterResource, required)) { - return Resources.none(); + return NULL_ASSIGNMENT; } // User limits @@ -760,24 +764,23 @@ private synchronized SchedulerApp getApplication( application.addSchedulingOpportunity(priority); // Try to schedule - Resource assigned = + CSAssignment assignment = assignContainersOnNode(clusterResource, node, application, priority, null); - + + Resource assigned = assignment.getResource(); + // Did we schedule or reserve a container? if (Resources.greaterThan(assigned, Resources.none())) { - Resource assignedResource = - application.getResourceRequest(priority, RMNode.ANY).getCapability(); // Book-keeping - allocateResource(clusterResource, - application, assignedResource); + allocateResource(clusterResource, application, assigned); // Reset scheduling opportunities application.resetSchedulingOpportunities(priority); // Done - return assignedResource; + return assignment; } else { // Do not assign out of order w.r.t priorities break; @@ -792,7 +795,7 @@ private synchronized SchedulerApp getApplication( application.showRequests(); } - return Resources.none(); + return NULL_ASSIGNMENT; } @@ -809,11 +812,12 @@ private synchronized Resource assignReservedContainer(SchedulerApp application, container.getId(), SchedulerUtils.UNRESERVED_CONTAINER), RMContainerEventType.RELEASED); - return container.getResource(); + return container.getResource(); // Ugh, return resource to force re-sort } // Try to assign if we have sufficient resources - assignContainersOnNode(clusterResource, node, application, priority, rmContainer); + assignContainersOnNode(clusterResource, node, application, priority, + rmContainer); // Doesn't matter... since it's already charged for at time of reservation // "re-reservation" is *free* @@ -966,7 +970,7 @@ boolean needContainers(SchedulerApp application, Priority priority, Resource req return (((starvation + requiredContainers) - reservedContainers) > 0); } - private Resource assignContainersOnNode(Resource clusterResource, + private CSAssignment assignContainersOnNode(Resource clusterResource, SchedulerNode node, SchedulerApp application, Priority priority, RMContainer reservedContainer) { @@ -977,7 +981,7 @@ private Resource assignContainersOnNode(Resource clusterResource, assignNodeLocalContainers(clusterResource, node, application, priority, reservedContainer); if (Resources.greaterThan(assigned, Resources.none())) { - return assigned; + return new CSAssignment(assigned, NodeType.NODE_LOCAL); } // Rack-local @@ -985,12 +989,14 @@ private Resource assignContainersOnNode(Resource clusterResource, assignRackLocalContainers(clusterResource, node, application, priority, reservedContainer); if (Resources.greaterThan(assigned, Resources.none())) { - return assigned; + return new CSAssignment(assigned, NodeType.RACK_LOCAL); } // Off-switch - return assignOffSwitchContainers(clusterResource, node, application, - priority, reservedContainer); + return new CSAssignment( + assignOffSwitchContainers(clusterResource, node, application, + priority, reservedContainer), + NodeType.OFF_SWITCH); } private Resource assignNodeLocalContainers(Resource clusterResource, @@ -1272,7 +1278,7 @@ synchronized void allocateResource(Resource clusterResource, metrics.setAvailableResourcesToUser(userName, application.getHeadroom()); LOG.info(getQueueName() + " used=" + usedResources + " numContainers=" + numContainers + - " user=" + userName + " resources=" + user.getConsumedResources()); + " user=" + userName + " user-resources=" + user.getConsumedResources()); } synchronized void releaseResource(Resource clusterResource, @@ -1290,7 +1296,7 @@ synchronized void releaseResource(Resource clusterResource, LOG.info(getQueueName() + " used=" + usedResources + " numContainers=" + numContainers + - " user=" + userName + " resources=" + user.getConsumedResources()); + " user=" + userName + " user-resources=" + user.getConsumedResources()); } @Override diff --git a/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/ParentQueue.java b/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/ParentQueue.java index 7d912009cf5..4fea3227c1d 100644 --- a/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/ParentQueue.java +++ b/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/ParentQueue.java @@ -48,6 +48,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.resource.Resources; import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer; import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerEventType; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.NodeType; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.QueueMetrics; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApp; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNode; @@ -500,10 +501,12 @@ synchronized void setMaxCapacity(float maximumCapacity) { } @Override - public synchronized Resource assignContainers( + public synchronized CSAssignment assignContainers( Resource clusterResource, SchedulerNode node) { - Resource assigned = Resources.createResource(0); - + CSAssignment assignment = + new CSAssignment(Resources.createResource(0), NodeType.NODE_LOCAL); + boolean assignedOffSwitch = false; + while (canAssign(node)) { if (LOG.isDebugEnabled()) { LOG.debug("Trying to assign containers to child-queue of " @@ -516,16 +519,18 @@ public synchronized Resource assignContainers( } // Schedule - Resource assignedToChild = + CSAssignment assignedToChild = assignContainersToChildQueues(clusterResource, node); + assignedOffSwitch = (assignedToChild.getType() == NodeType.OFF_SWITCH); // Done if no child-queue assigned anything - if (Resources.greaterThan(assignedToChild, Resources.none())) { + if (Resources.greaterThan(assignedToChild.getResource(), + Resources.none())) { // Track resource utilization for the parent-queue - allocateResource(clusterResource, assignedToChild); + allocateResource(clusterResource, assignedToChild.getResource()); // Track resource utilization in this pass of the scheduler - Resources.addTo(assigned, assignedToChild); + Resources.addTo(assignment.getResource(), assignedToChild.getResource()); LOG.info("assignedContainer" + " queue=" + getQueueName() + @@ -539,17 +544,26 @@ public synchronized Resource assignContainers( if (LOG.isDebugEnabled()) { LOG.debug("ParentQ=" + getQueueName() - + " assignedSoFarInThisIteration=" + assigned + + " assignedSoFarInThisIteration=" + assignment.getResource() + " utilization=" + getUtilization()); } // Do not assign more than one container if this isn't the root queue - if (!rootQueue) { + // or if we've already assigned an off-switch container + if (rootQueue) { + if (assignedOffSwitch) { + if (LOG.isDebugEnabled()) { + LOG.debug("Not assigning more than one off-switch container," + + " assignments so far: " + assignment); + } + break; + } + } else { break; } } - return assigned; + return assignment; } private synchronized boolean assignToQueue(Resource clusterResource) { @@ -573,9 +587,10 @@ private boolean canAssign(SchedulerNode node) { minimumAllocation); } - synchronized Resource assignContainersToChildQueues(Resource cluster, + synchronized CSAssignment assignContainersToChildQueues(Resource cluster, SchedulerNode node) { - Resource assigned = Resources.createResource(0); + CSAssignment assignment = + new CSAssignment(Resources.createResource(0), NodeType.NODE_LOCAL); printChildQueues(); @@ -586,25 +601,28 @@ synchronized Resource assignContainersToChildQueues(Resource cluster, LOG.debug("Trying to assign to queue: " + childQueue.getQueuePath() + " stats: " + childQueue); } - assigned = childQueue.assignContainers(cluster, node); + assignment = childQueue.assignContainers(cluster, node); if(LOG.isDebugEnabled()) { - LOG.debug("Assignedto queue: " + childQueue.getQueuePath() - + " stats: " + childQueue + " --> " + assigned.getMemory()); + LOG.debug("Assigned to queue: " + childQueue.getQueuePath() + + " stats: " + childQueue + " --> " + + assignment.getResource().getMemory() + ", " + assignment.getType()); } // If we do assign, remove the queue and re-insert in-order to re-sort - if (Resources.greaterThan(assigned, Resources.none())) { + if (Resources.greaterThan(assignment.getResource(), Resources.none())) { // Remove and re-insert to sort iter.remove(); LOG.info("Re-sorting queues since queue: " + childQueue.getQueuePath() + " stats: " + childQueue); childQueues.add(childQueue); - printChildQueues(); + if (LOG.isDebugEnabled()) { + printChildQueues(); + } break; } } - return assigned; + return assignment; } String getChildQueuesToPrint() { diff --git a/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestLeafQueue.java b/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestLeafQueue.java index 96dbe3b5a33..534697099cf 100644 --- a/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestLeafQueue.java +++ b/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestLeafQueue.java @@ -811,49 +811,56 @@ public void testLocalityScheduling() throws Exception { app_0.updateResourceRequests(app_0_requests_0); // Start testing... + CSAssignment assignment = null; // Start with off switch, shouldn't allocate due to delay scheduling - a.assignContainers(clusterResource, node_2); + assignment = a.assignContainers(clusterResource, node_2); verify(app_0, never()).allocate(any(NodeType.class), eq(node_2), any(Priority.class), any(ResourceRequest.class), any(Container.class)); assertEquals(1, app_0.getSchedulingOpportunities(priority)); assertEquals(3, app_0.getTotalRequiredResources(priority)); + assertEquals(NodeType.NODE_LOCAL, assignment.getType()); // None->NODE_LOCAL // Another off switch, shouldn't allocate due to delay scheduling - a.assignContainers(clusterResource, node_2); + assignment = a.assignContainers(clusterResource, node_2); verify(app_0, never()).allocate(any(NodeType.class), eq(node_2), any(Priority.class), any(ResourceRequest.class), any(Container.class)); assertEquals(2, app_0.getSchedulingOpportunities(priority)); assertEquals(3, app_0.getTotalRequiredResources(priority)); + assertEquals(NodeType.NODE_LOCAL, assignment.getType()); // None->NODE_LOCAL // Another off switch, shouldn't allocate due to delay scheduling - a.assignContainers(clusterResource, node_2); + assignment = a.assignContainers(clusterResource, node_2); verify(app_0, never()).allocate(any(NodeType.class), eq(node_2), any(Priority.class), any(ResourceRequest.class), any(Container.class)); assertEquals(3, app_0.getSchedulingOpportunities(priority)); assertEquals(3, app_0.getTotalRequiredResources(priority)); + assertEquals(NodeType.NODE_LOCAL, assignment.getType()); // None->NODE_LOCAL // Another off switch, now we should allocate // since missedOpportunities=3 and reqdContainers=3 - a.assignContainers(clusterResource, node_2); + assignment = a.assignContainers(clusterResource, node_2); verify(app_0).allocate(eq(NodeType.OFF_SWITCH), eq(node_2), any(Priority.class), any(ResourceRequest.class), any(Container.class)); assertEquals(0, app_0.getSchedulingOpportunities(priority)); // should reset assertEquals(2, app_0.getTotalRequiredResources(priority)); + assertEquals(NodeType.OFF_SWITCH, assignment.getType()); // NODE_LOCAL - node_0 - a.assignContainers(clusterResource, node_0); + assignment = a.assignContainers(clusterResource, node_0); verify(app_0).allocate(eq(NodeType.NODE_LOCAL), eq(node_0), any(Priority.class), any(ResourceRequest.class), any(Container.class)); assertEquals(0, app_0.getSchedulingOpportunities(priority)); // should reset assertEquals(1, app_0.getTotalRequiredResources(priority)); + assertEquals(NodeType.NODE_LOCAL, assignment.getType()); // NODE_LOCAL - node_1 - a.assignContainers(clusterResource, node_1); + assignment = a.assignContainers(clusterResource, node_1); verify(app_0).allocate(eq(NodeType.NODE_LOCAL), eq(node_1), any(Priority.class), any(ResourceRequest.class), any(Container.class)); assertEquals(0, app_0.getSchedulingOpportunities(priority)); // should reset assertEquals(0, app_0.getTotalRequiredResources(priority)); + assertEquals(NodeType.NODE_LOCAL, assignment.getType()); // Add 1 more request to check for RACK_LOCAL app_0_requests_0.clear(); @@ -872,11 +879,12 @@ public void testLocalityScheduling() throws Exception { String host_3 = "host_3"; // on rack_1 SchedulerNode node_3 = TestUtils.getMockNode(host_3, rack_1, 0, 8*GB); - a.assignContainers(clusterResource, node_3); + assignment = a.assignContainers(clusterResource, node_3); verify(app_0).allocate(eq(NodeType.RACK_LOCAL), eq(node_3), any(Priority.class), any(ResourceRequest.class), any(Container.class)); assertEquals(0, app_0.getSchedulingOpportunities(priority)); // should reset assertEquals(0, app_0.getTotalRequiredResources(priority)); + assertEquals(NodeType.RACK_LOCAL, assignment.getType()); } @Test diff --git a/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestParentQueue.java b/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestParentQueue.java index 638ed0edb82..bbfd503b4de 100644 --- a/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestParentQueue.java +++ b/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestParentQueue.java @@ -29,6 +29,7 @@ import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.server.resourcemanager.RMContext; import org.apache.hadoop.yarn.server.resourcemanager.resource.Resources; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.NodeType; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApp; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNode; @@ -92,11 +93,18 @@ private SchedulerApp getMockApplication(int appId, String user) { private void stubQueueAllocation(final CSQueue queue, final Resource clusterResource, final SchedulerNode node, final int allocation) { + stubQueueAllocation(queue, clusterResource, node, allocation, + NodeType.NODE_LOCAL); + } + + private void stubQueueAllocation(final CSQueue queue, + final Resource clusterResource, final SchedulerNode node, + final int allocation, final NodeType type) { // Simulate the queue allocation - doAnswer(new Answer() { + doAnswer(new Answer() { @Override - public Resource answer(InvocationOnMock invocation) throws Throwable { + public CSAssignment answer(InvocationOnMock invocation) throws Throwable { try { throw new Exception(); } catch (Exception e) { @@ -115,8 +123,8 @@ public Resource answer(InvocationOnMock invocation) throws Throwable { // Next call - nothing if (allocation > 0) { - doReturn(Resources.none()).when(queue).assignContainers( - eq(clusterResource), eq(node)); + doReturn(new CSAssignment(Resources.none(), type)). + when(queue).assignContainers(eq(clusterResource), eq(node)); // Mock the node's resource availability Resource available = node.getAvailableResource(); @@ -124,7 +132,7 @@ public Resource answer(InvocationOnMock invocation) throws Throwable { when(node).getAvailableResource(); } - return allocatedResource; + return new CSAssignment(allocatedResource, type); } }). when(queue).assignContainers(eq(clusterResource), eq(node)); @@ -401,6 +409,78 @@ public void testMultiLevelQueues() throws Exception { } + @Test + public void testOffSwitchScheduling() throws Exception { + // Setup queue configs + setupSingleLevelQueues(csConf); + + Map queues = new HashMap(); + CSQueue root = + CapacityScheduler.parseQueue(csContext, csConf, null, + CapacitySchedulerConfiguration.ROOT, queues, queues, + CapacityScheduler.queueComparator, + CapacityScheduler.applicationComparator, + TestUtils.spyHook); + + // Setup some nodes + final int memoryPerNode = 10; + final int numNodes = 2; + + SchedulerNode node_0 = + TestUtils.getMockNode("host_0", DEFAULT_RACK, 0, memoryPerNode*GB); + SchedulerNode node_1 = + TestUtils.getMockNode("host_1", DEFAULT_RACK, 0, memoryPerNode*GB); + + final Resource clusterResource = + Resources.createResource(numNodes * (memoryPerNode*GB)); + when(csContext.getNumClusterNodes()).thenReturn(numNodes); + + // Start testing + LeafQueue a = (LeafQueue)queues.get(A); + LeafQueue b = (LeafQueue)queues.get(B); + final float delta = 0.0001f; + + // Simulate B returning a container on node_0 + stubQueueAllocation(a, clusterResource, node_0, 0*GB, NodeType.OFF_SWITCH); + stubQueueAllocation(b, clusterResource, node_0, 1*GB, NodeType.OFF_SWITCH); + root.assignContainers(clusterResource, node_0); + assertEquals(0.0f, a.getUtilization(), delta); + assertEquals(computeQueueUtilization(b, 1*GB, clusterResource), + b.getUtilization(), delta); + + // Now, A should get the scheduling opportunity since A=0G/6G, B=1G/14G + // also, B gets a scheduling opportunity since A allocates RACK_LOCAL + stubQueueAllocation(a, clusterResource, node_1, 2*GB, NodeType.RACK_LOCAL); + stubQueueAllocation(b, clusterResource, node_1, 1*GB, NodeType.OFF_SWITCH); + root.assignContainers(clusterResource, node_1); + InOrder allocationOrder = inOrder(a, b); + allocationOrder.verify(a).assignContainers(eq(clusterResource), + any(SchedulerNode.class)); + allocationOrder.verify(b).assignContainers(eq(clusterResource), + any(SchedulerNode.class)); + assertEquals(computeQueueUtilization(a, 2*GB, clusterResource), + a.getUtilization(), delta); + assertEquals(computeQueueUtilization(b, 2*GB, clusterResource), + b.getUtilization(), delta); + + // Now, B should get the scheduling opportunity + // since A has 2/6G while B has 2/14G, + // However, since B returns off-switch, A won't get an opportunity + stubQueueAllocation(a, clusterResource, node_0, 1*GB, NodeType.NODE_LOCAL); + stubQueueAllocation(b, clusterResource, node_0, 2*GB, NodeType.OFF_SWITCH); + root.assignContainers(clusterResource, node_0); + allocationOrder = inOrder(b, a); + allocationOrder.verify(b).assignContainers(eq(clusterResource), + any(SchedulerNode.class)); + allocationOrder.verify(a).assignContainers(eq(clusterResource), + any(SchedulerNode.class)); + assertEquals(computeQueueUtilization(a, 2*GB, clusterResource), + a.getUtilization(), delta); + assertEquals(computeQueueUtilization(b, 4*GB, clusterResource), + b.getUtilization(), delta); + + } + @After public void tearDown() throws Exception { }