diff --git a/hadoop-yarn-project/CHANGES.txt b/hadoop-yarn-project/CHANGES.txt index ba023a5260b..cff7f6bfae8 100644 --- a/hadoop-yarn-project/CHANGES.txt +++ b/hadoop-yarn-project/CHANGES.txt @@ -391,6 +391,9 @@ Release 2.8.0 - UNRELEASED YARN-2768. Avoid cloning Resource in FSAppAttempt#updateDemand. (Hong Zhiguo via kasha) + YARN-3983. Refactored CapacityScheduleri#FiCaSchedulerApp to easier extend + container allocation logic. (Wangda Tan via jianhe) + BUG FIXES YARN-3197. Confusing log generated by CapacityScheduler. (Varun Saxena diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/AbstractCSQueue.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/AbstractCSQueue.java index dcc42058c46..134b9414648 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/AbstractCSQueue.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/AbstractCSQueue.java @@ -53,13 +53,7 @@ import org.apache.hadoop.yarn.util.resource.Resources; import com.google.common.collect.Sets; public abstract class AbstractCSQueue implements CSQueue { - private static final Log LOG = LogFactory.getLog(AbstractCSQueue.class); - - static final CSAssignment NULL_ASSIGNMENT = - new CSAssignment(Resources.createResource(0, 0), NodeType.NODE_LOCAL); - - static final CSAssignment SKIP_ASSIGNMENT = new CSAssignment(true); - + private static final Log LOG = LogFactory.getLog(AbstractCSQueue.class); CSQueue parent; final String queueName; volatile int numContainers; diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CSAssignment.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CSAssignment.java index ceb6f7e49f7..928437f96af 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CSAssignment.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CSAssignment.java @@ -24,12 +24,17 @@ import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.NodeType; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.AssignmentInformation; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp; +import org.apache.hadoop.yarn.util.resource.Resources; @Private @Unstable public class CSAssignment { + public static final CSAssignment NULL_ASSIGNMENT = + new CSAssignment(Resources.createResource(0, 0), NodeType.NODE_LOCAL); - final private Resource resource; + public static final CSAssignment SKIP_ASSIGNMENT = new CSAssignment(true); + + private Resource resource; private NodeType type; private RMContainer excessReservation; private FiCaSchedulerApp application; @@ -67,6 +72,10 @@ public class CSAssignment { public Resource getResource() { return resource; } + + public void setResource(Resource resource) { + this.resource = resource; + } public NodeType getType() { return type; diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java index acfbad0c03c..a71cc68639c 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java @@ -777,7 +777,7 @@ public class LeafQueue extends AbstractCSQueue { // if our queue cannot access this node, just return if (schedulingMode == SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY && !accessibleToPartition(node.getPartition())) { - return NULL_ASSIGNMENT; + return CSAssignment.NULL_ASSIGNMENT; } // Check if this queue need more resource, simply skip allocation if this @@ -789,7 +789,7 @@ public class LeafQueue extends AbstractCSQueue { + ", because it doesn't need more resource, schedulingMode=" + schedulingMode.name() + " node-partition=" + node.getPartition()); } - return NULL_ASSIGNMENT; + return CSAssignment.NULL_ASSIGNMENT; } for (Iterator assignmentIterator = @@ -800,7 +800,7 @@ public class LeafQueue extends AbstractCSQueue { if (!super.canAssignToThisQueue(clusterResource, node.getPartition(), currentResourceLimits, application.getCurrentReservation(), schedulingMode)) { - return NULL_ASSIGNMENT; + return CSAssignment.NULL_ASSIGNMENT; } Resource userLimit = @@ -846,11 +846,11 @@ public class LeafQueue extends AbstractCSQueue { } else if (!assignment.getSkipped()) { // If we don't allocate anything, and it is not skipped by application, // we will return to respect FIFO of applications - return NULL_ASSIGNMENT; + return CSAssignment.NULL_ASSIGNMENT; } } - return NULL_ASSIGNMENT; + return CSAssignment.NULL_ASSIGNMENT; } protected Resource getHeadroom(User user, Resource queueCurrentLimit, diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/ParentQueue.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/ParentQueue.java index e54b9e2a59f..725aea18403 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/ParentQueue.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/ParentQueue.java @@ -384,7 +384,7 @@ public class ParentQueue extends AbstractCSQueue { // if our queue cannot access this node, just return if (schedulingMode == SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY && !accessibleToPartition(node.getPartition())) { - return NULL_ASSIGNMENT; + return CSAssignment.NULL_ASSIGNMENT; } // Check if this queue need more resource, simply skip allocation if this @@ -396,7 +396,7 @@ public class ParentQueue extends AbstractCSQueue { + ", because it doesn't need more resource, schedulingMode=" + schedulingMode.name() + " node-partition=" + node.getPartition()); } - return NULL_ASSIGNMENT; + return CSAssignment.NULL_ASSIGNMENT; } CSAssignment assignment = diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/allocator/AllocationState.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/allocator/AllocationState.java new file mode 100644 index 00000000000..d1580bd995d --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/allocator/AllocationState.java @@ -0,0 +1,28 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.allocator; + +public enum AllocationState { + APP_SKIPPED, + PRIORITY_SKIPPED, + LOCALITY_SKIPPED, + QUEUE_SKIPPED, + ALLOCATED, + RESERVED +} \ No newline at end of file diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/allocator/ContainerAllocation.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/allocator/ContainerAllocation.java new file mode 100644 index 00000000000..00c1bb974fb --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/allocator/ContainerAllocation.java @@ -0,0 +1,76 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.allocator; + +import org.apache.hadoop.yarn.api.records.Container; +import org.apache.hadoop.yarn.api.records.Resource; +import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.NodeType; +import org.apache.hadoop.yarn.util.resource.Resources; + +public class ContainerAllocation { + public static final ContainerAllocation PRIORITY_SKIPPED = + new ContainerAllocation(null, null, AllocationState.PRIORITY_SKIPPED); + + public static final ContainerAllocation APP_SKIPPED = + new ContainerAllocation(null, null, AllocationState.APP_SKIPPED); + + public static final ContainerAllocation QUEUE_SKIPPED = + new ContainerAllocation(null, null, AllocationState.QUEUE_SKIPPED); + + public static final ContainerAllocation LOCALITY_SKIPPED = + new ContainerAllocation(null, null, AllocationState.LOCALITY_SKIPPED); + + RMContainer containerToBeUnreserved; + private Resource resourceToBeAllocated = Resources.none(); + AllocationState state; + NodeType containerNodeType = NodeType.NODE_LOCAL; + NodeType requestNodeType = NodeType.NODE_LOCAL; + Container updatedContainer; + + public ContainerAllocation(RMContainer containerToBeUnreserved, + Resource resourceToBeAllocated, AllocationState state) { + this.containerToBeUnreserved = containerToBeUnreserved; + this.resourceToBeAllocated = resourceToBeAllocated; + this.state = state; + } + + public RMContainer getContainerToBeUnreserved() { + return containerToBeUnreserved; + } + + public Resource getResourceToBeAllocated() { + if (resourceToBeAllocated == null) { + return Resources.none(); + } + return resourceToBeAllocated; + } + + public AllocationState getAllocationState() { + return state; + } + + public NodeType getContainerNodeType() { + return containerNodeType; + } + + public Container getUpdatedContainer() { + return updatedContainer; + } +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/allocator/ContainerAllocator.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/allocator/ContainerAllocator.java new file mode 100644 index 00000000000..b4168dd28e0 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/allocator/ContainerAllocator.java @@ -0,0 +1,115 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.allocator; + +import org.apache.hadoop.yarn.api.records.Priority; +import org.apache.hadoop.yarn.api.records.Resource; +import org.apache.hadoop.yarn.server.resourcemanager.RMContext; +import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager; +import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceLimits; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.SchedulingMode; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerNode; +import org.apache.hadoop.yarn.util.resource.ResourceCalculator; +import org.apache.hadoop.yarn.util.resource.Resources; + +/** + * For an application, resource limits and resource requests, decide how to + * allocate container. This is to make application resource allocation logic + * extensible. + */ +public abstract class ContainerAllocator { + FiCaSchedulerApp application; + final ResourceCalculator rc; + final RMContext rmContext; + + public ContainerAllocator(FiCaSchedulerApp application, + ResourceCalculator rc, RMContext rmContext) { + this.application = application; + this.rc = rc; + this.rmContext = rmContext; + } + + /** + * preAllocation is to perform checks, etc. to see if we can/cannot allocate + * container. It will put necessary information to returned + * {@link ContainerAllocation}. + */ + abstract ContainerAllocation preAllocation( + Resource clusterResource, FiCaSchedulerNode node, + SchedulingMode schedulingMode, ResourceLimits resourceLimits, + Priority priority, RMContainer reservedContainer); + + /** + * doAllocation is to update application metrics, create containers, etc. + * According to allocating conclusion decided by preAllocation. + */ + abstract ContainerAllocation doAllocation( + ContainerAllocation allocationResult, Resource clusterResource, + FiCaSchedulerNode node, SchedulingMode schedulingMode, Priority priority, + RMContainer reservedContainer); + + boolean checkHeadroom(Resource clusterResource, + ResourceLimits currentResourceLimits, Resource required, + FiCaSchedulerNode node) { + // If headroom + currentReservation < required, we cannot allocate this + // require + Resource resourceCouldBeUnReserved = application.getCurrentReservation(); + if (!application.getCSLeafQueue().getReservationContinueLooking() + || !node.getPartition().equals(RMNodeLabelsManager.NO_LABEL)) { + // If we don't allow reservation continuous looking, OR we're looking at + // non-default node partition, we won't allow to unreserve before + // allocation. + resourceCouldBeUnReserved = Resources.none(); + } + return Resources.greaterThanOrEqual(rc, clusterResource, Resources.add( + currentResourceLimits.getHeadroom(), resourceCouldBeUnReserved), + required); + } + + /** + * allocate needs to handle following stuffs: + * + * + */ + public ContainerAllocation allocate(Resource clusterResource, + FiCaSchedulerNode node, SchedulingMode schedulingMode, + ResourceLimits resourceLimits, Priority priority, + RMContainer reservedContainer) { + ContainerAllocation result = + preAllocation(clusterResource, node, schedulingMode, + resourceLimits, priority, reservedContainer); + + if (AllocationState.ALLOCATED == result.state + || AllocationState.RESERVED == result.state) { + result = doAllocation(result, clusterResource, node, + schedulingMode, priority, reservedContainer); + } + + return result; + } +} \ No newline at end of file diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/allocator/RegularContainerAllocator.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/allocator/RegularContainerAllocator.java new file mode 100644 index 00000000000..6effcd3a64e --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/allocator/RegularContainerAllocator.java @@ -0,0 +1,629 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.allocator; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.yarn.api.records.Container; +import org.apache.hadoop.yarn.api.records.ContainerId; +import org.apache.hadoop.yarn.api.records.NodeId; +import org.apache.hadoop.yarn.api.records.Priority; +import org.apache.hadoop.yarn.api.records.Resource; +import org.apache.hadoop.yarn.api.records.ResourceRequest; +import org.apache.hadoop.yarn.server.resourcemanager.RMContext; +import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager; +import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt; +import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.NodeType; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceLimits; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerAppUtils; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerUtils; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.SchedulingMode; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerNode; +import org.apache.hadoop.yarn.server.utils.BuilderUtils; +import org.apache.hadoop.yarn.util.resource.ResourceCalculator; +import org.apache.hadoop.yarn.util.resource.Resources; + +/** + * Allocate normal (new) containers, considers locality/label, etc. Using + * delayed scheduling mechanism to get better locality allocation. + */ +public class RegularContainerAllocator extends ContainerAllocator { + private static final Log LOG = LogFactory.getLog(RegularContainerAllocator.class); + + private ResourceRequest lastResourceRequest = null; + + public RegularContainerAllocator(FiCaSchedulerApp application, + ResourceCalculator rc, RMContext rmContext) { + super(application, rc, rmContext); + } + + private ContainerAllocation preCheckForNewContainer(Resource clusterResource, + FiCaSchedulerNode node, SchedulingMode schedulingMode, + ResourceLimits resourceLimits, Priority priority) { + if (SchedulerAppUtils.isBlacklisted(application, node, LOG)) { + return ContainerAllocation.APP_SKIPPED; + } + + ResourceRequest anyRequest = + application.getResourceRequest(priority, ResourceRequest.ANY); + if (null == anyRequest) { + return ContainerAllocation.PRIORITY_SKIPPED; + } + + // Required resource + Resource required = anyRequest.getCapability(); + + // Do we need containers at this 'priority'? + if (application.getTotalRequiredResources(priority) <= 0) { + return ContainerAllocation.PRIORITY_SKIPPED; + } + + // AM container allocation doesn't support non-exclusive allocation to + // avoid painful of preempt an AM container + if (schedulingMode == SchedulingMode.IGNORE_PARTITION_EXCLUSIVITY) { + RMAppAttempt rmAppAttempt = + rmContext.getRMApps().get(application.getApplicationId()) + .getCurrentAppAttempt(); + if (rmAppAttempt.getSubmissionContext().getUnmanagedAM() == false + && null == rmAppAttempt.getMasterContainer()) { + if (LOG.isDebugEnabled()) { + LOG.debug("Skip allocating AM container to app_attempt=" + + application.getApplicationAttemptId() + + ", don't allow to allocate AM container in non-exclusive mode"); + } + return ContainerAllocation.APP_SKIPPED; + } + } + + // Is the node-label-expression of this offswitch resource request + // matches the node's label? + // If not match, jump to next priority. + if (!SchedulerUtils.checkResourceRequestMatchingNodePartition(anyRequest, + node.getPartition(), schedulingMode)) { + return ContainerAllocation.PRIORITY_SKIPPED; + } + + if (!application.getCSLeafQueue().getReservationContinueLooking()) { + if (!shouldAllocOrReserveNewContainer(priority, required)) { + if (LOG.isDebugEnabled()) { + LOG.debug("doesn't need containers based on reservation algo!"); + } + return ContainerAllocation.PRIORITY_SKIPPED; + } + } + + if (!checkHeadroom(clusterResource, resourceLimits, required, node)) { + if (LOG.isDebugEnabled()) { + LOG.debug("cannot allocate required resource=" + required + + " because of headroom"); + } + return ContainerAllocation.QUEUE_SKIPPED; + } + + // Inform the application it is about to get a scheduling opportunity + application.addSchedulingOpportunity(priority); + + // Increase missed-non-partitioned-resource-request-opportunity. + // This is to make sure non-partitioned-resource-request will prefer + // to be allocated to non-partitioned nodes + int missedNonPartitionedRequestSchedulingOpportunity = 0; + if (anyRequest.getNodeLabelExpression() + .equals(RMNodeLabelsManager.NO_LABEL)) { + missedNonPartitionedRequestSchedulingOpportunity = + application + .addMissedNonPartitionedRequestSchedulingOpportunity(priority); + } + + if (schedulingMode == SchedulingMode.IGNORE_PARTITION_EXCLUSIVITY) { + // Before doing allocation, we need to check scheduling opportunity to + // make sure : non-partitioned resource request should be scheduled to + // non-partitioned partition first. + if (missedNonPartitionedRequestSchedulingOpportunity < rmContext + .getScheduler().getNumClusterNodes()) { + if (LOG.isDebugEnabled()) { + LOG.debug("Skip app_attempt=" + application.getApplicationAttemptId() + + " priority=" + priority + + " because missed-non-partitioned-resource-request" + + " opportunity under requred:" + " Now=" + + missedNonPartitionedRequestSchedulingOpportunity + " required=" + + rmContext.getScheduler().getNumClusterNodes()); + } + + return ContainerAllocation.APP_SKIPPED; + } + } + + return null; + } + + @Override + ContainerAllocation preAllocation(Resource clusterResource, + FiCaSchedulerNode node, SchedulingMode schedulingMode, + ResourceLimits resourceLimits, Priority priority, + RMContainer reservedContainer) { + ContainerAllocation result; + if (null == reservedContainer) { + // pre-check when allocating new container + result = + preCheckForNewContainer(clusterResource, node, schedulingMode, + resourceLimits, priority); + if (null != result) { + return result; + } + } else { + // pre-check when allocating reserved container + if (application.getTotalRequiredResources(priority) == 0) { + // Release + return new ContainerAllocation(reservedContainer, null, + AllocationState.QUEUE_SKIPPED); + } + } + + // Try to allocate containers on node + result = + assignContainersOnNode(clusterResource, node, priority, + reservedContainer, schedulingMode, resourceLimits); + + if (null == reservedContainer) { + if (result.state == AllocationState.PRIORITY_SKIPPED) { + // Don't count 'skipped nodes' as a scheduling opportunity! + application.subtractSchedulingOpportunity(priority); + } + } + + return result; + } + + public synchronized float getLocalityWaitFactor( + Priority priority, int clusterNodes) { + // Estimate: Required unique resources (i.e. hosts + racks) + int requiredResources = + Math.max(application.getResourceRequests(priority).size() - 1, 0); + + // waitFactor can't be more than '1' + // i.e. no point skipping more than clustersize opportunities + return Math.min(((float)requiredResources / clusterNodes), 1.0f); + } + + private int getActualNodeLocalityDelay() { + return Math.min(rmContext.getScheduler().getNumClusterNodes(), application + .getCSLeafQueue().getNodeLocalityDelay()); + } + + private boolean canAssign(Priority priority, FiCaSchedulerNode node, + NodeType type, RMContainer reservedContainer) { + + // Clearly we need containers for this application... + if (type == NodeType.OFF_SWITCH) { + if (reservedContainer != null) { + return true; + } + + // 'Delay' off-switch + ResourceRequest offSwitchRequest = + application.getResourceRequest(priority, ResourceRequest.ANY); + long missedOpportunities = application.getSchedulingOpportunities(priority); + long requiredContainers = offSwitchRequest.getNumContainers(); + + float localityWaitFactor = + getLocalityWaitFactor(priority, rmContext.getScheduler() + .getNumClusterNodes()); + + return ((requiredContainers * localityWaitFactor) < missedOpportunities); + } + + // Check if we need containers on this rack + ResourceRequest rackLocalRequest = + application.getResourceRequest(priority, node.getRackName()); + if (rackLocalRequest == null || rackLocalRequest.getNumContainers() <= 0) { + return false; + } + + // If we are here, we do need containers on this rack for RACK_LOCAL req + if (type == NodeType.RACK_LOCAL) { + // 'Delay' rack-local just a little bit... + long missedOpportunities = application.getSchedulingOpportunities(priority); + return getActualNodeLocalityDelay() < missedOpportunities; + } + + // Check if we need containers on this host + if (type == NodeType.NODE_LOCAL) { + // Now check if we need containers on this host... + ResourceRequest nodeLocalRequest = + application.getResourceRequest(priority, node.getNodeName()); + if (nodeLocalRequest != null) { + return nodeLocalRequest.getNumContainers() > 0; + } + } + + return false; + } + + private ContainerAllocation assignNodeLocalContainers( + Resource clusterResource, ResourceRequest nodeLocalResourceRequest, + FiCaSchedulerNode node, Priority priority, RMContainer reservedContainer, + SchedulingMode schedulingMode, ResourceLimits currentResoureLimits) { + if (canAssign(priority, node, NodeType.NODE_LOCAL, reservedContainer)) { + return assignContainer(clusterResource, node, priority, + nodeLocalResourceRequest, NodeType.NODE_LOCAL, reservedContainer, + schedulingMode, currentResoureLimits); + } + + // Skip node-local request, go to rack-local request + return ContainerAllocation.LOCALITY_SKIPPED; + } + + private ContainerAllocation assignRackLocalContainers( + Resource clusterResource, ResourceRequest rackLocalResourceRequest, + FiCaSchedulerNode node, Priority priority, RMContainer reservedContainer, + SchedulingMode schedulingMode, ResourceLimits currentResoureLimits) { + if (canAssign(priority, node, NodeType.RACK_LOCAL, reservedContainer)) { + return assignContainer(clusterResource, node, priority, + rackLocalResourceRequest, NodeType.RACK_LOCAL, reservedContainer, + schedulingMode, currentResoureLimits); + } + + // Skip rack-local request, go to off-switch request + return ContainerAllocation.LOCALITY_SKIPPED; + } + + private ContainerAllocation assignOffSwitchContainers( + Resource clusterResource, ResourceRequest offSwitchResourceRequest, + FiCaSchedulerNode node, Priority priority, RMContainer reservedContainer, + SchedulingMode schedulingMode, ResourceLimits currentResoureLimits) { + if (canAssign(priority, node, NodeType.OFF_SWITCH, reservedContainer)) { + return assignContainer(clusterResource, node, priority, + offSwitchResourceRequest, NodeType.OFF_SWITCH, reservedContainer, + schedulingMode, currentResoureLimits); + } + + return ContainerAllocation.QUEUE_SKIPPED; + } + + private ContainerAllocation assignContainersOnNode(Resource clusterResource, + FiCaSchedulerNode node, Priority priority, RMContainer reservedContainer, + SchedulingMode schedulingMode, ResourceLimits currentResoureLimits) { + + ContainerAllocation assigned; + + NodeType requestType = null; + // Data-local + ResourceRequest nodeLocalResourceRequest = + application.getResourceRequest(priority, node.getNodeName()); + if (nodeLocalResourceRequest != null) { + requestType = NodeType.NODE_LOCAL; + assigned = + assignNodeLocalContainers(clusterResource, nodeLocalResourceRequest, + node, priority, reservedContainer, schedulingMode, + currentResoureLimits); + if (Resources.greaterThan(rc, clusterResource, + assigned.getResourceToBeAllocated(), Resources.none())) { + assigned.requestNodeType = requestType; + return assigned; + } + } + + // Rack-local + ResourceRequest rackLocalResourceRequest = + application.getResourceRequest(priority, node.getRackName()); + if (rackLocalResourceRequest != null) { + if (!rackLocalResourceRequest.getRelaxLocality()) { + return ContainerAllocation.PRIORITY_SKIPPED; + } + + if (requestType != NodeType.NODE_LOCAL) { + requestType = NodeType.RACK_LOCAL; + } + + assigned = + assignRackLocalContainers(clusterResource, rackLocalResourceRequest, + node, priority, reservedContainer, schedulingMode, + currentResoureLimits); + if (Resources.greaterThan(rc, clusterResource, + assigned.getResourceToBeAllocated(), Resources.none())) { + assigned.requestNodeType = requestType; + return assigned; + } + } + + // Off-switch + ResourceRequest offSwitchResourceRequest = + application.getResourceRequest(priority, ResourceRequest.ANY); + if (offSwitchResourceRequest != null) { + if (!offSwitchResourceRequest.getRelaxLocality()) { + return ContainerAllocation.PRIORITY_SKIPPED; + } + if (requestType != NodeType.NODE_LOCAL + && requestType != NodeType.RACK_LOCAL) { + requestType = NodeType.OFF_SWITCH; + } + + assigned = + assignOffSwitchContainers(clusterResource, offSwitchResourceRequest, + node, priority, reservedContainer, schedulingMode, + currentResoureLimits); + assigned.requestNodeType = requestType; + + return assigned; + } + + return ContainerAllocation.PRIORITY_SKIPPED; + } + + private ContainerAllocation assignContainer(Resource clusterResource, + FiCaSchedulerNode node, Priority priority, ResourceRequest request, + NodeType type, RMContainer rmContainer, SchedulingMode schedulingMode, + ResourceLimits currentResoureLimits) { + lastResourceRequest = request; + + if (LOG.isDebugEnabled()) { + LOG.debug("assignContainers: node=" + node.getNodeName() + + " application=" + application.getApplicationId() + + " priority=" + priority.getPriority() + + " request=" + request + " type=" + type); + } + + // check if the resource request can access the label + if (!SchedulerUtils.checkResourceRequestMatchingNodePartition(request, + node.getPartition(), schedulingMode)) { + // this is a reserved container, but we cannot allocate it now according + // to label not match. This can be caused by node label changed + // We should un-reserve this container. + return new ContainerAllocation(rmContainer, null, + AllocationState.QUEUE_SKIPPED); + } + + Resource capability = request.getCapability(); + Resource available = node.getAvailableResource(); + Resource totalResource = node.getTotalResource(); + + if (!Resources.lessThanOrEqual(rc, clusterResource, + capability, totalResource)) { + LOG.warn("Node : " + node.getNodeID() + + " does not have sufficient resource for request : " + request + + " node total capability : " + node.getTotalResource()); + return ContainerAllocation.QUEUE_SKIPPED; + } + + assert Resources.greaterThan( + rc, clusterResource, available, Resources.none()); + + boolean shouldAllocOrReserveNewContainer = shouldAllocOrReserveNewContainer( + priority, capability); + + // Can we allocate a container on this node? + int availableContainers = + rc.computeAvailableContainers(available, capability); + + // How much need to unreserve equals to: + // max(required - headroom, amountNeedUnreserve) + Resource resourceNeedToUnReserve = + Resources.max(rc, clusterResource, + Resources.subtract(capability, currentResoureLimits.getHeadroom()), + currentResoureLimits.getAmountNeededUnreserve()); + + boolean needToUnreserve = + Resources.greaterThan(rc, clusterResource, + resourceNeedToUnReserve, Resources.none()); + + RMContainer unreservedContainer = null; + boolean reservationsContinueLooking = + application.getCSLeafQueue().getReservationContinueLooking(); + + if (availableContainers > 0) { + // Allocate... + // We will only do continuous reservation when this is not allocated from + // reserved container + if (rmContainer == null && reservationsContinueLooking + && node.getLabels().isEmpty()) { + // when reservationsContinueLooking is set, we may need to unreserve + // some containers to meet this queue, its parents', or the users' + // resource limits. + // TODO, need change here when we want to support continuous reservation + // looking for labeled partitions. + if (!shouldAllocOrReserveNewContainer || needToUnreserve) { + if (!needToUnreserve) { + // If we shouldn't allocate/reserve new container then we should + // unreserve one the same size we are asking for since the + // currentResoureLimits.getAmountNeededUnreserve could be zero. If + // the limit was hit then use the amount we need to unreserve to be + // under the limit. + resourceNeedToUnReserve = capability; + } + unreservedContainer = + application.findNodeToUnreserve(clusterResource, node, priority, + resourceNeedToUnReserve); + // When (minimum-unreserved-resource > 0 OR we cannot allocate + // new/reserved + // container (That means we *have to* unreserve some resource to + // continue)). If we failed to unreserve some resource, we can't + // continue. + if (null == unreservedContainer) { + return ContainerAllocation.QUEUE_SKIPPED; + } + } + } + + ContainerAllocation result = + new ContainerAllocation(unreservedContainer, request.getCapability(), + AllocationState.ALLOCATED); + result.containerNodeType = type; + return result; + } else { + // if we are allowed to allocate but this node doesn't have space, reserve it or + // if this was an already a reserved container, reserve it again + if (shouldAllocOrReserveNewContainer || rmContainer != null) { + + if (reservationsContinueLooking && rmContainer == null) { + // we could possibly ignoring queue capacity or user limits when + // reservationsContinueLooking is set. Make sure we didn't need to unreserve + // one. + if (needToUnreserve) { + if (LOG.isDebugEnabled()) { + LOG.debug("we needed to unreserve to be able to allocate"); + } + return ContainerAllocation.QUEUE_SKIPPED; + } + } + + ContainerAllocation result = + new ContainerAllocation(null, request.getCapability(), + AllocationState.RESERVED); + result.containerNodeType = type; + return result; + } + return ContainerAllocation.QUEUE_SKIPPED; + } + } + + boolean + shouldAllocOrReserveNewContainer(Priority priority, Resource required) { + int requiredContainers = application.getTotalRequiredResources(priority); + int reservedContainers = application.getNumReservedContainers(priority); + int starvation = 0; + if (reservedContainers > 0) { + float nodeFactor = + Resources + .ratio(rc, required, application.getCSLeafQueue().getMaximumAllocation()); + + // Use percentage of node required to bias against large containers... + // Protect against corner case where you need the whole node with + // Math.min(nodeFactor, minimumAllocationFactor) + starvation = + (int) ((application.getReReservations(priority) / + (float) reservedContainers) * (1.0f - (Math.min( + nodeFactor, application.getCSLeafQueue() + .getMinimumAllocationFactor())))); + + if (LOG.isDebugEnabled()) { + LOG.debug("needsContainers:" + " app.#re-reserve=" + + application.getReReservations(priority) + " reserved=" + + reservedContainers + " nodeFactor=" + nodeFactor + + " minAllocFactor=" + + application.getCSLeafQueue().getMinimumAllocationFactor() + + " starvation=" + starvation); + } + } + return (((starvation + requiredContainers) - reservedContainers) > 0); + } + + private Container getContainer(RMContainer rmContainer, + FiCaSchedulerNode node, Resource capability, Priority priority) { + return (rmContainer != null) ? rmContainer.getContainer() + : createContainer(node, capability, priority); + } + + private Container createContainer(FiCaSchedulerNode node, Resource capability, + Priority priority) { + + NodeId nodeId = node.getRMNode().getNodeID(); + ContainerId containerId = + BuilderUtils.newContainerId(application.getApplicationAttemptId(), + application.getNewContainerId()); + + // Create the container + return BuilderUtils.newContainer(containerId, nodeId, node.getRMNode() + .getHttpAddress(), capability, priority, null); + } + + private ContainerAllocation handleNewContainerAllocation( + ContainerAllocation allocationResult, FiCaSchedulerNode node, + Priority priority, RMContainer reservedContainer, Container container) { + // Handling container allocation + // Did we previously reserve containers at this 'priority'? + if (reservedContainer != null) { + application.unreserve(priority, node, reservedContainer); + } + + // Inform the application + RMContainer allocatedContainer = + application.allocate(allocationResult.containerNodeType, node, + priority, lastResourceRequest, container); + + // Does the application need this resource? + if (allocatedContainer == null) { + // Skip this app if we failed to allocate. + ContainerAllocation ret = + new ContainerAllocation(allocationResult.containerToBeUnreserved, + null, AllocationState.QUEUE_SKIPPED); + ret.state = AllocationState.APP_SKIPPED; + return ret; + } + + // Inform the node + node.allocateContainer(allocatedContainer); + + // update locality statistics + application.incNumAllocatedContainers(allocationResult.containerNodeType, + allocationResult.requestNodeType); + + return allocationResult; + } + + @Override + ContainerAllocation doAllocation(ContainerAllocation allocationResult, + Resource clusterResource, FiCaSchedulerNode node, + SchedulingMode schedulingMode, Priority priority, + RMContainer reservedContainer) { + // Create the container if necessary + Container container = + getContainer(reservedContainer, node, + allocationResult.getResourceToBeAllocated(), priority); + + // something went wrong getting/creating the container + if (container == null) { + LOG.warn("Couldn't get container for allocation!"); + return ContainerAllocation.QUEUE_SKIPPED; + } + + if (allocationResult.getAllocationState() == AllocationState.ALLOCATED) { + // When allocating container + allocationResult = + handleNewContainerAllocation(allocationResult, node, priority, + reservedContainer, container); + } else { + // When reserving container + application.reserve(priority, node, reservedContainer, container); + } + allocationResult.updatedContainer = container; + + // Only reset opportunities when we FIRST allocate the container. (IAW, When + // reservedContainer != null, it's not the first time) + if (reservedContainer == null) { + // Don't reset scheduling opportunities for off-switch assignments + // otherwise the app will be delayed for each non-local assignment. + // This helps apps with many off-cluster requests schedule faster. + if (allocationResult.containerNodeType != NodeType.OFF_SWITCH) { + if (LOG.isDebugEnabled()) { + LOG.debug("Resetting scheduling opportunities"); + } + application.resetSchedulingOpportunities(priority); + } + + // Non-exclusive scheduling opportunity is different: we need reset + // it every time to make sure non-labeled resource request will be + // most likely allocated on non-labeled nodes first. + application.resetMissedNonPartitionedRequestSchedulingOpportunity(priority); + } + + return allocationResult; + } +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerApp.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerApp.java index c660fcbe3b9..d75b2c39a22 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerApp.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerApp.java @@ -24,7 +24,6 @@ import java.util.List; import java.util.Map; import java.util.Set; -import org.apache.commons.lang.mutable.MutableObject; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.classification.InterfaceAudience.Private; @@ -40,9 +39,7 @@ import org.apache.hadoop.yarn.api.records.ResourceRequest; import org.apache.hadoop.yarn.server.resourcemanager.RMAuditLogger; import org.apache.hadoop.yarn.server.resourcemanager.RMAuditLogger.AuditConstants; import org.apache.hadoop.yarn.server.resourcemanager.RMContext; -import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp; -import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt; import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer; import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerEvent; import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerEventType; @@ -54,15 +51,16 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.NodeType; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.Queue; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceLimits; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler; -import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerAppUtils; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplicationAttempt; -import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerUtils; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CSAssignment; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityHeadroomProvider; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.LeafQueue; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.SchedulingMode; -import org.apache.hadoop.yarn.server.utils.BuilderUtils; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.allocator.AllocationState; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.allocator.ContainerAllocator; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.allocator.RegularContainerAllocator; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.allocator.ContainerAllocation; import org.apache.hadoop.yarn.util.resource.DefaultResourceCalculator; import org.apache.hadoop.yarn.util.resource.ResourceCalculator; import org.apache.hadoop.yarn.util.resource.Resources; @@ -78,11 +76,6 @@ import com.google.common.annotations.VisibleForTesting; public class FiCaSchedulerApp extends SchedulerApplicationAttempt { private static final Log LOG = LogFactory.getLog(FiCaSchedulerApp.class); - static final CSAssignment NULL_ASSIGNMENT = - new CSAssignment(Resources.createResource(0, 0), NodeType.NODE_LOCAL); - - static final CSAssignment SKIP_ASSIGNMENT = new CSAssignment(true); - private final Set containersToPreempt = new HashSet(); @@ -91,6 +84,8 @@ public class FiCaSchedulerApp extends SchedulerApplicationAttempt { private ResourceCalculator rc = new DefaultResourceCalculator(); private ResourceScheduler scheduler; + + private ContainerAllocator containerAllocator; public FiCaSchedulerApp(ApplicationAttemptId applicationAttemptId, String user, Queue queue, ActiveUsersManager activeUsersManager, @@ -124,6 +119,8 @@ public class FiCaSchedulerApp extends SchedulerApplicationAttempt { if (scheduler.getResourceCalculator() != null) { rc = scheduler.getResourceCalculator(); } + + containerAllocator = new RegularContainerAllocator(this, rc, rmContext); } synchronized public boolean containerCompleted(RMContainer rmContainer, @@ -386,223 +383,6 @@ public class FiCaSchedulerApp extends SchedulerApplicationAttempt { ((FiCaSchedulerApp) appAttempt).getHeadroomProvider(); } - private int getActualNodeLocalityDelay() { - return Math.min(scheduler.getNumClusterNodes(), getCSLeafQueue() - .getNodeLocalityDelay()); - } - - private boolean canAssign(Priority priority, FiCaSchedulerNode node, - NodeType type, RMContainer reservedContainer) { - - // Clearly we need containers for this application... - if (type == NodeType.OFF_SWITCH) { - if (reservedContainer != null) { - return true; - } - - // 'Delay' off-switch - ResourceRequest offSwitchRequest = - getResourceRequest(priority, ResourceRequest.ANY); - long missedOpportunities = getSchedulingOpportunities(priority); - long requiredContainers = offSwitchRequest.getNumContainers(); - - float localityWaitFactor = - getLocalityWaitFactor(priority, scheduler.getNumClusterNodes()); - - return ((requiredContainers * localityWaitFactor) < missedOpportunities); - } - - // Check if we need containers on this rack - ResourceRequest rackLocalRequest = - getResourceRequest(priority, node.getRackName()); - if (rackLocalRequest == null || rackLocalRequest.getNumContainers() <= 0) { - return false; - } - - // If we are here, we do need containers on this rack for RACK_LOCAL req - if (type == NodeType.RACK_LOCAL) { - // 'Delay' rack-local just a little bit... - long missedOpportunities = getSchedulingOpportunities(priority); - return getActualNodeLocalityDelay() < missedOpportunities; - } - - // Check if we need containers on this host - if (type == NodeType.NODE_LOCAL) { - // Now check if we need containers on this host... - ResourceRequest nodeLocalRequest = - getResourceRequest(priority, node.getNodeName()); - if (nodeLocalRequest != null) { - return nodeLocalRequest.getNumContainers() > 0; - } - } - - return false; - } - - boolean - shouldAllocOrReserveNewContainer(Priority priority, Resource required) { - int requiredContainers = getTotalRequiredResources(priority); - int reservedContainers = getNumReservedContainers(priority); - int starvation = 0; - if (reservedContainers > 0) { - float nodeFactor = - Resources.ratio( - rc, required, getCSLeafQueue().getMaximumAllocation() - ); - - // Use percentage of node required to bias against large containers... - // Protect against corner case where you need the whole node with - // Math.min(nodeFactor, minimumAllocationFactor) - starvation = - (int)((getReReservations(priority) / (float)reservedContainers) * - (1.0f - (Math.min(nodeFactor, getCSLeafQueue().getMinimumAllocationFactor()))) - ); - - if (LOG.isDebugEnabled()) { - LOG.debug("needsContainers:" + - " app.#re-reserve=" + getReReservations(priority) + - " reserved=" + reservedContainers + - " nodeFactor=" + nodeFactor + - " minAllocFactor=" + getCSLeafQueue().getMinimumAllocationFactor() + - " starvation=" + starvation); - } - } - return (((starvation + requiredContainers) - reservedContainers) > 0); - } - - private CSAssignment assignNodeLocalContainers(Resource clusterResource, - ResourceRequest nodeLocalResourceRequest, FiCaSchedulerNode node, - Priority priority, - RMContainer reservedContainer, MutableObject allocatedContainer, - SchedulingMode schedulingMode, ResourceLimits currentResoureLimits) { - if (canAssign(priority, node, NodeType.NODE_LOCAL, - reservedContainer)) { - return assignContainer(clusterResource, node, priority, - nodeLocalResourceRequest, NodeType.NODE_LOCAL, reservedContainer, - allocatedContainer, schedulingMode, currentResoureLimits); - } - - return new CSAssignment(Resources.none(), NodeType.NODE_LOCAL); - } - - private CSAssignment assignRackLocalContainers(Resource clusterResource, - ResourceRequest rackLocalResourceRequest, FiCaSchedulerNode node, - Priority priority, - RMContainer reservedContainer, MutableObject allocatedContainer, - SchedulingMode schedulingMode, ResourceLimits currentResoureLimits) { - if (canAssign(priority, node, NodeType.RACK_LOCAL, - reservedContainer)) { - return assignContainer(clusterResource, node, priority, - rackLocalResourceRequest, NodeType.RACK_LOCAL, reservedContainer, - allocatedContainer, schedulingMode, currentResoureLimits); - } - - return new CSAssignment(Resources.none(), NodeType.RACK_LOCAL); - } - - private CSAssignment assignOffSwitchContainers(Resource clusterResource, - ResourceRequest offSwitchResourceRequest, FiCaSchedulerNode node, - Priority priority, - RMContainer reservedContainer, MutableObject allocatedContainer, - SchedulingMode schedulingMode, ResourceLimits currentResoureLimits) { - if (canAssign(priority, node, NodeType.OFF_SWITCH, - reservedContainer)) { - return assignContainer(clusterResource, node, priority, - offSwitchResourceRequest, NodeType.OFF_SWITCH, reservedContainer, - allocatedContainer, schedulingMode, currentResoureLimits); - } - - return new CSAssignment(Resources.none(), NodeType.OFF_SWITCH); - } - - private CSAssignment assignContainersOnNode(Resource clusterResource, - FiCaSchedulerNode node, Priority priority, - RMContainer reservedContainer, SchedulingMode schedulingMode, - ResourceLimits currentResoureLimits) { - - CSAssignment assigned; - - NodeType requestType = null; - MutableObject allocatedContainer = new MutableObject(); - // Data-local - ResourceRequest nodeLocalResourceRequest = - getResourceRequest(priority, node.getNodeName()); - if (nodeLocalResourceRequest != null) { - requestType = NodeType.NODE_LOCAL; - assigned = - assignNodeLocalContainers(clusterResource, nodeLocalResourceRequest, - node, priority, reservedContainer, - allocatedContainer, schedulingMode, currentResoureLimits); - if (Resources.greaterThan(rc, clusterResource, - assigned.getResource(), Resources.none())) { - - //update locality statistics - if (allocatedContainer.getValue() != null) { - incNumAllocatedContainers(NodeType.NODE_LOCAL, - requestType); - } - assigned.setType(NodeType.NODE_LOCAL); - return assigned; - } - } - - // Rack-local - ResourceRequest rackLocalResourceRequest = - getResourceRequest(priority, node.getRackName()); - if (rackLocalResourceRequest != null) { - if (!rackLocalResourceRequest.getRelaxLocality()) { - return SKIP_ASSIGNMENT; - } - - if (requestType != NodeType.NODE_LOCAL) { - requestType = NodeType.RACK_LOCAL; - } - - assigned = - assignRackLocalContainers(clusterResource, rackLocalResourceRequest, - node, priority, reservedContainer, - allocatedContainer, schedulingMode, currentResoureLimits); - if (Resources.greaterThan(rc, clusterResource, - assigned.getResource(), Resources.none())) { - - //update locality statistics - if (allocatedContainer.getValue() != null) { - incNumAllocatedContainers(NodeType.RACK_LOCAL, - requestType); - } - assigned.setType(NodeType.RACK_LOCAL); - return assigned; - } - } - - // Off-switch - ResourceRequest offSwitchResourceRequest = - getResourceRequest(priority, ResourceRequest.ANY); - if (offSwitchResourceRequest != null) { - if (!offSwitchResourceRequest.getRelaxLocality()) { - return SKIP_ASSIGNMENT; - } - if (requestType != NodeType.NODE_LOCAL - && requestType != NodeType.RACK_LOCAL) { - requestType = NodeType.OFF_SWITCH; - } - - assigned = - assignOffSwitchContainers(clusterResource, offSwitchResourceRequest, - node, priority, reservedContainer, - allocatedContainer, schedulingMode, currentResoureLimits); - - // update locality statistics - if (allocatedContainer.getValue() != null) { - incNumAllocatedContainers(NodeType.OFF_SWITCH, requestType); - } - assigned.setType(NodeType.OFF_SWITCH); - return assigned; - } - - return SKIP_ASSIGNMENT; - } - public void reserve(Priority priority, FiCaSchedulerNode node, RMContainer rmContainer, Container container) { // Update reserved metrics if this is the first reservation @@ -618,25 +398,6 @@ public class FiCaSchedulerApp extends SchedulerApplicationAttempt { node.reserveResource(this, priority, rmContainer); } - private Container getContainer(RMContainer rmContainer, - FiCaSchedulerNode node, Resource capability, Priority priority) { - return (rmContainer != null) ? rmContainer.getContainer() - : createContainer(node, capability, priority); - } - - Container createContainer(FiCaSchedulerNode node, Resource capability, - Priority priority) { - - NodeId nodeId = node.getRMNode().getNodeID(); - ContainerId containerId = - BuilderUtils.newContainerId(getApplicationAttemptId(), - getNewContainerId()); - - // Create the container - return BuilderUtils.newContainer(containerId, nodeId, node.getRMNode() - .getHttpAddress(), capability, priority, null); - } - @VisibleForTesting public RMContainer findNodeToUnreserve(Resource clusterResource, FiCaSchedulerNode node, Priority priority, @@ -672,203 +433,63 @@ public class FiCaSchedulerApp extends SchedulerApplicationAttempt { return nodeToUnreserve.getReservedContainer(); } - private LeafQueue getCSLeafQueue() { + public LeafQueue getCSLeafQueue() { return (LeafQueue)queue; } - private CSAssignment assignContainer(Resource clusterResource, FiCaSchedulerNode node, - Priority priority, - ResourceRequest request, NodeType type, RMContainer rmContainer, - MutableObject createdContainer, SchedulingMode schedulingMode, - ResourceLimits currentResoureLimits) { - if (LOG.isDebugEnabled()) { - LOG.debug("assignContainers: node=" + node.getNodeName() - + " application=" + getApplicationId() - + " priority=" + priority.getPriority() - + " request=" + request + " type=" + type); - } + private CSAssignment getCSAssignmentFromAllocateResult( + Resource clusterResource, ContainerAllocation result) { + // Handle skipped + boolean skipped = + (result.getAllocationState() == AllocationState.APP_SKIPPED); + CSAssignment assignment = new CSAssignment(skipped); + assignment.setApplication(this); + + // Handle excess reservation + assignment.setExcessReservation(result.getContainerToBeUnreserved()); - // check if the resource request can access the label - if (!SchedulerUtils.checkResourceRequestMatchingNodePartition(request, - node.getPartition(), schedulingMode)) { - // this is a reserved container, but we cannot allocate it now according - // to label not match. This can be caused by node label changed - // We should un-reserve this container. - if (rmContainer != null) { - unreserve(priority, node, rmContainer); - } - return new CSAssignment(Resources.none(), type); - } + // If we allocated something + if (Resources.greaterThan(rc, clusterResource, + result.getResourceToBeAllocated(), Resources.none())) { + Resource allocatedResource = result.getResourceToBeAllocated(); + Container updatedContainer = result.getUpdatedContainer(); + + assignment.setResource(allocatedResource); + assignment.setType(result.getContainerNodeType()); - Resource capability = request.getCapability(); - Resource available = node.getAvailableResource(); - Resource totalResource = node.getTotalResource(); - - if (!Resources.lessThanOrEqual(rc, clusterResource, - capability, totalResource)) { - LOG.warn("Node : " + node.getNodeID() - + " does not have sufficient resource for request : " + request - + " node total capability : " + node.getTotalResource()); - return new CSAssignment(Resources.none(), type); - } - - assert Resources.greaterThan( - rc, clusterResource, available, Resources.none()); - - // Create the container if necessary - Container container = - getContainer(rmContainer, node, capability, priority); - - // something went wrong getting/creating the container - if (container == null) { - LOG.warn("Couldn't get container for allocation!"); - return new CSAssignment(Resources.none(), type); - } - - boolean shouldAllocOrReserveNewContainer = shouldAllocOrReserveNewContainer( - priority, capability); - - // Can we allocate a container on this node? - int availableContainers = - rc.computeAvailableContainers(available, capability); - - // How much need to unreserve equals to: - // max(required - headroom, amountNeedUnreserve) - Resource resourceNeedToUnReserve = - Resources.max(rc, clusterResource, - Resources.subtract(capability, currentResoureLimits.getHeadroom()), - currentResoureLimits.getAmountNeededUnreserve()); - - boolean needToUnreserve = - Resources.greaterThan(rc, clusterResource, - resourceNeedToUnReserve, Resources.none()); - - RMContainer unreservedContainer = null; - boolean reservationsContinueLooking = - getCSLeafQueue().getReservationContinueLooking(); - - if (availableContainers > 0) { - // Allocate... - - // Did we previously reserve containers at this 'priority'? - if (rmContainer != null) { - unreserve(priority, node, rmContainer); - } else if (reservationsContinueLooking && node.getLabels().isEmpty()) { - // when reservationsContinueLooking is set, we may need to unreserve - // some containers to meet this queue, its parents', or the users' resource limits. - // TODO, need change here when we want to support continuous reservation - // looking for labeled partitions. - if (!shouldAllocOrReserveNewContainer || needToUnreserve) { - if (!needToUnreserve) { - // If we shouldn't allocate/reserve new container then we should - // unreserve one the same size we are asking for since the - // currentResoureLimits.getAmountNeededUnreserve could be zero. If - // the limit was hit then use the amount we need to unreserve to be - // under the limit. - resourceNeedToUnReserve = capability; - } - unreservedContainer = - findNodeToUnreserve(clusterResource, node, priority, - resourceNeedToUnReserve); - // When (minimum-unreserved-resource > 0 OR we cannot allocate new/reserved - // container (That means we *have to* unreserve some resource to - // continue)). If we failed to unreserve some resource, we can't continue. - if (null == unreservedContainer) { - return new CSAssignment(Resources.none(), type); - } - } - } - - // Inform the application - RMContainer allocatedContainer = - allocate(type, node, priority, request, container); - - // Does the application need this resource? - if (allocatedContainer == null) { - CSAssignment csAssignment = new CSAssignment(Resources.none(), type); - csAssignment.setApplication(this); - csAssignment.setExcessReservation(unreservedContainer); - return csAssignment; - } - - // Inform the node - node.allocateContainer(allocatedContainer); - - // Inform the ordering policy - getCSLeafQueue().getOrderingPolicy().containerAllocated(this, - allocatedContainer); - - LOG.info("assignedContainer" + - " application attempt=" + getApplicationAttemptId() + - " container=" + container + - " queue=" + this + - " clusterResource=" + clusterResource); - createdContainer.setValue(allocatedContainer); - CSAssignment assignment = new CSAssignment(container.getResource(), type); - assignment.getAssignmentInformation().addAllocationDetails( - container.getId(), getCSLeafQueue().getQueuePath()); - assignment.getAssignmentInformation().incrAllocations(); - assignment.setApplication(this); - Resources.addTo(assignment.getAssignmentInformation().getAllocated(), - container.getResource()); - - assignment.setExcessReservation(unreservedContainer); - return assignment; - } else { - // if we are allowed to allocate but this node doesn't have space, reserve it or - // if this was an already a reserved container, reserve it again - if (shouldAllocOrReserveNewContainer || rmContainer != null) { - - if (reservationsContinueLooking && rmContainer == null) { - // we could possibly ignoring queue capacity or user limits when - // reservationsContinueLooking is set. Make sure we didn't need to unreserve - // one. - if (needToUnreserve) { - if (LOG.isDebugEnabled()) { - LOG.debug("we needed to unreserve to be able to allocate"); - } - return new CSAssignment(Resources.none(), type); - } - } - - // Reserve by 'charging' in advance... - reserve(priority, node, rmContainer, container); - - LOG.info("Reserved container " + - " application=" + getApplicationId() + - " resource=" + request.getCapability() + - " queue=" + this.toString() + - " cluster=" + clusterResource); - CSAssignment assignment = - new CSAssignment(request.getCapability(), type); + if (result.getAllocationState() == AllocationState.RESERVED) { + // This is a reserved container + LOG.info("Reserved container " + " application=" + getApplicationId() + + " resource=" + allocatedResource + " queue=" + + this.toString() + " cluster=" + clusterResource); assignment.getAssignmentInformation().addReservationDetails( - container.getId(), getCSLeafQueue().getQueuePath()); + updatedContainer.getId(), getCSLeafQueue().getQueuePath()); assignment.getAssignmentInformation().incrReservations(); Resources.addTo(assignment.getAssignmentInformation().getReserved(), - request.getCapability()); - return assignment; + allocatedResource); + assignment.setFulfilledReservation(true); + } else { + // This is a new container + // Inform the ordering policy + LOG.info("assignedContainer" + " application attempt=" + + getApplicationAttemptId() + " container=" + + updatedContainer.getId() + " queue=" + this + " clusterResource=" + + clusterResource); + + getCSLeafQueue().getOrderingPolicy().containerAllocated(this, + getRMContainer(updatedContainer.getId())); + + assignment.getAssignmentInformation().addAllocationDetails( + updatedContainer.getId(), getCSLeafQueue().getQueuePath()); + assignment.getAssignmentInformation().incrAllocations(); + Resources.addTo(assignment.getAssignmentInformation().getAllocated(), + allocatedResource); } - return new CSAssignment(Resources.none(), type); } + + return assignment; } - - private boolean checkHeadroom(Resource clusterResource, - ResourceLimits currentResourceLimits, Resource required, FiCaSchedulerNode node) { - // If headroom + currentReservation < required, we cannot allocate this - // require - Resource resourceCouldBeUnReserved = getCurrentReservation(); - if (!getCSLeafQueue().getReservationContinueLooking() || !node.getPartition().equals(RMNodeLabelsManager.NO_LABEL)) { - // If we don't allow reservation continuous looking, OR we're looking at - // non-default node partition, we won't allow to unreserve before - // allocation. - resourceCouldBeUnReserved = Resources.none(); - } - return Resources - .greaterThanOrEqual(rc, clusterResource, Resources.add( - currentResourceLimits.getHeadroom(), resourceCouldBeUnReserved), - required); - } - + public CSAssignment assignContainers(Resource clusterResource, FiCaSchedulerNode node, ResourceLimits currentResourceLimits, SchedulingMode schedulingMode) { @@ -886,174 +507,41 @@ public class FiCaSchedulerApp extends SchedulerApplicationAttempt { + ", because it doesn't need more resource, schedulingMode=" + schedulingMode.name() + " node-label=" + node.getPartition()); } - return SKIP_ASSIGNMENT; + return CSAssignment.SKIP_ASSIGNMENT; } synchronized (this) { - // Check if this resource is on the blacklist - if (SchedulerAppUtils.isBlacklisted(this, node, LOG)) { - return SKIP_ASSIGNMENT; - } - // Schedule in priority order for (Priority priority : getPriorities()) { - ResourceRequest anyRequest = - getResourceRequest(priority, ResourceRequest.ANY); - if (null == anyRequest) { + ContainerAllocation allocationResult = + containerAllocator.allocate(clusterResource, node, + schedulingMode, currentResourceLimits, priority, null); + + // If it's a skipped allocation + AllocationState allocationState = allocationResult.getAllocationState(); + + if (allocationState == AllocationState.PRIORITY_SKIPPED) { continue; } - - // Required resource - Resource required = anyRequest.getCapability(); - - // Do we need containers at this 'priority'? - if (getTotalRequiredResources(priority) <= 0) { - continue; - } - - // AM container allocation doesn't support non-exclusive allocation to - // avoid painful of preempt an AM container - if (schedulingMode == SchedulingMode.IGNORE_PARTITION_EXCLUSIVITY) { - - RMAppAttempt rmAppAttempt = - rmContext.getRMApps() - .get(getApplicationId()).getCurrentAppAttempt(); - if (rmAppAttempt.getSubmissionContext().getUnmanagedAM() == false - && null == rmAppAttempt.getMasterContainer()) { - if (LOG.isDebugEnabled()) { - LOG.debug("Skip allocating AM container to app_attempt=" - + getApplicationAttemptId() - + ", don't allow to allocate AM container in non-exclusive mode"); - } - break; - } - } - - // Is the node-label-expression of this offswitch resource request - // matches the node's label? - // If not match, jump to next priority. - if (!SchedulerUtils.checkResourceRequestMatchingNodePartition( - anyRequest, node.getPartition(), schedulingMode)) { - continue; - } - - if (!getCSLeafQueue().getReservationContinueLooking()) { - if (!shouldAllocOrReserveNewContainer(priority, required)) { - if (LOG.isDebugEnabled()) { - LOG.debug("doesn't need containers based on reservation algo!"); - } - continue; - } - } - - if (!checkHeadroom(clusterResource, currentResourceLimits, required, - node)) { - if (LOG.isDebugEnabled()) { - LOG.debug("cannot allocate required resource=" + required - + " because of headroom"); - } - return NULL_ASSIGNMENT; - } - - // Inform the application it is about to get a scheduling opportunity - addSchedulingOpportunity(priority); - - // Increase missed-non-partitioned-resource-request-opportunity. - // This is to make sure non-partitioned-resource-request will prefer - // to be allocated to non-partitioned nodes - int missedNonPartitionedRequestSchedulingOpportunity = 0; - if (anyRequest.getNodeLabelExpression().equals( - RMNodeLabelsManager.NO_LABEL)) { - missedNonPartitionedRequestSchedulingOpportunity = - addMissedNonPartitionedRequestSchedulingOpportunity(priority); - } - - if (schedulingMode == SchedulingMode.IGNORE_PARTITION_EXCLUSIVITY) { - // Before doing allocation, we need to check scheduling opportunity to - // make sure : non-partitioned resource request should be scheduled to - // non-partitioned partition first. - if (missedNonPartitionedRequestSchedulingOpportunity < rmContext - .getScheduler().getNumClusterNodes()) { - if (LOG.isDebugEnabled()) { - LOG.debug("Skip app_attempt=" - + getApplicationAttemptId() + " priority=" - + priority - + " because missed-non-partitioned-resource-request" - + " opportunity under requred:" + " Now=" - + missedNonPartitionedRequestSchedulingOpportunity - + " required=" - + rmContext.getScheduler().getNumClusterNodes()); - } - - return SKIP_ASSIGNMENT; - } - } - - // Try to schedule - CSAssignment assignment = - assignContainersOnNode(clusterResource, node, - priority, null, schedulingMode, currentResourceLimits); - - // Did the application skip this node? - if (assignment.getSkipped()) { - // Don't count 'skipped nodes' as a scheduling opportunity! - subtractSchedulingOpportunity(priority); - continue; - } - - // Did we schedule or reserve a container? - Resource assigned = assignment.getResource(); - if (Resources.greaterThan(rc, clusterResource, - assigned, Resources.none())) { - // Don't reset scheduling opportunities for offswitch assignments - // otherwise the app will be delayed for each non-local assignment. - // This helps apps with many off-cluster requests schedule faster. - if (assignment.getType() != NodeType.OFF_SWITCH) { - if (LOG.isDebugEnabled()) { - LOG.debug("Resetting scheduling opportunities"); - } - resetSchedulingOpportunities(priority); - } - // Non-exclusive scheduling opportunity is different: we need reset - // it every time to make sure non-labeled resource request will be - // most likely allocated on non-labeled nodes first. - resetMissedNonPartitionedRequestSchedulingOpportunity(priority); - - // Done - return assignment; - } else { - // Do not assign out of order w.r.t priorities - return SKIP_ASSIGNMENT; - } + return getCSAssignmentFromAllocateResult(clusterResource, + allocationResult); } } - return SKIP_ASSIGNMENT; + // We will reach here if we skipped all priorities of the app, so we will + // skip the app. + return CSAssignment.SKIP_ASSIGNMENT; } public synchronized CSAssignment assignReservedContainer( FiCaSchedulerNode node, RMContainer rmContainer, Resource clusterResource, SchedulingMode schedulingMode) { - // Do we still need this reservation? - Priority priority = rmContainer.getReservedPriority(); - if (getTotalRequiredResources(priority) == 0) { - // Release - return new CSAssignment(this, rmContainer); - } + ContainerAllocation result = + containerAllocator.allocate(clusterResource, node, + schedulingMode, new ResourceLimits(Resources.none()), + rmContainer.getReservedPriority(), rmContainer); - // Try to assign if we have sufficient resources - CSAssignment tmp = - assignContainersOnNode(clusterResource, node, priority, - rmContainer, schedulingMode, new ResourceLimits(Resources.none())); - - // Doesn't matter... since it's already charged for at time of reservation - // "re-reservation" is *free* - CSAssignment ret = new CSAssignment(Resources.none(), NodeType.NODE_LOCAL); - if (tmp.getAssignmentInformation().getNumAllocations() > 0) { - ret.setFulfilledReservation(true); - } - return ret; + return getCSAssignmentFromAllocateResult(clusterResource, result); } - } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestLeafQueue.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestLeafQueue.java index d63130088fc..f419528e8cf 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestLeafQueue.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestLeafQueue.java @@ -1489,7 +1489,7 @@ public class TestLeafQueue { ContainerState.COMPLETE, "", ContainerExitStatus.KILLED_BY_RESOURCEMANAGER), RMContainerEventType.KILL, null, true); - CSAssignment assignment = a.assignContainers(clusterResource, node_0, + a.assignContainers(clusterResource, node_0, new ResourceLimits(clusterResource), SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY); assertEquals(4*GB, a.getUsedResources().getMemory()); assertEquals(0*GB, app_0.getCurrentConsumption().getMemory()); @@ -1498,7 +1498,20 @@ public class TestLeafQueue { assertEquals(0*GB, node_0.getUsedResource().getMemory()); } - + private void verifyContainerAllocated(CSAssignment assignment, NodeType nodeType) { + Assert.assertTrue(Resources.greaterThan(resourceCalculator, null, + assignment.getResource(), Resources.none())); + Assert + .assertTrue(assignment.getAssignmentInformation().getNumAllocations() > 0); + Assert.assertEquals(nodeType, assignment.getType()); + } + + private void verifyNoContainerAllocated(CSAssignment assignment) { + Assert.assertTrue(Resources.equals(assignment.getResource(), + Resources.none())); + Assert + .assertTrue(assignment.getAssignmentInformation().getNumAllocations() == 0); + } @Test public void testLocalityScheduling() throws Exception { @@ -1512,11 +1525,11 @@ public class TestLeafQueue { // Submit applications final ApplicationAttemptId appAttemptId_0 = TestUtils.getMockApplicationAttemptId(0, 0); - FiCaSchedulerApp app_0 = - spy(new FiCaSchedulerApp(appAttemptId_0, user_0, a, - mock(ActiveUsersManager.class), spyRMContext)); + FiCaSchedulerApp app_0 = + new FiCaSchedulerApp(appAttemptId_0, user_0, a, + mock(ActiveUsersManager.class), spyRMContext); a.submitApplicationAttempt(app_0, user_0); - + // Setup some nodes and racks String host_0 = "127.0.0.1"; String rack_0 = "rack_0"; @@ -1561,8 +1574,7 @@ public class TestLeafQueue { // Start with off switch, shouldn't allocate due to delay scheduling assignment = a.assignContainers(clusterResource, node_2, new ResourceLimits(clusterResource), SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY); - verify(app_0, never()).allocate(any(NodeType.class), eq(node_2), - any(Priority.class), any(ResourceRequest.class), any(Container.class)); + verifyNoContainerAllocated(assignment); assertEquals(1, app_0.getSchedulingOpportunities(priority)); assertEquals(3, app_0.getTotalRequiredResources(priority)); assertEquals(NodeType.NODE_LOCAL, assignment.getType()); // None->NODE_LOCAL @@ -1570,8 +1582,7 @@ public class TestLeafQueue { // Another off switch, shouldn't allocate due to delay scheduling assignment = a.assignContainers(clusterResource, node_2, new ResourceLimits(clusterResource), SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY); - verify(app_0, never()).allocate(any(NodeType.class), eq(node_2), - any(Priority.class), any(ResourceRequest.class), any(Container.class)); + verifyNoContainerAllocated(assignment); assertEquals(2, app_0.getSchedulingOpportunities(priority)); assertEquals(3, app_0.getTotalRequiredResources(priority)); assertEquals(NodeType.NODE_LOCAL, assignment.getType()); // None->NODE_LOCAL @@ -1579,8 +1590,7 @@ public class TestLeafQueue { // Another off switch, shouldn't allocate due to delay scheduling assignment = a.assignContainers(clusterResource, node_2, new ResourceLimits(clusterResource), SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY); - verify(app_0, never()).allocate(any(NodeType.class), eq(node_2), - any(Priority.class), any(ResourceRequest.class), any(Container.class)); + verifyNoContainerAllocated(assignment); assertEquals(3, app_0.getSchedulingOpportunities(priority)); assertEquals(3, app_0.getTotalRequiredResources(priority)); assertEquals(NodeType.NODE_LOCAL, assignment.getType()); // None->NODE_LOCAL @@ -1589,26 +1599,21 @@ public class TestLeafQueue { // since missedOpportunities=3 and reqdContainers=3 assignment = a.assignContainers(clusterResource, node_2, new ResourceLimits(clusterResource), SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY); - verify(app_0).allocate(eq(NodeType.OFF_SWITCH), eq(node_2), - any(Priority.class), any(ResourceRequest.class), any(Container.class)); + verifyContainerAllocated(assignment, NodeType.OFF_SWITCH); assertEquals(4, app_0.getSchedulingOpportunities(priority)); // should NOT reset assertEquals(2, app_0.getTotalRequiredResources(priority)); - assertEquals(NodeType.OFF_SWITCH, assignment.getType()); // NODE_LOCAL - node_0 assignment = a.assignContainers(clusterResource, node_0, new ResourceLimits(clusterResource), SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY); - verify(app_0).allocate(eq(NodeType.NODE_LOCAL), eq(node_0), - any(Priority.class), any(ResourceRequest.class), any(Container.class)); + verifyContainerAllocated(assignment, NodeType.NODE_LOCAL); assertEquals(0, app_0.getSchedulingOpportunities(priority)); // should reset assertEquals(1, app_0.getTotalRequiredResources(priority)); - assertEquals(NodeType.NODE_LOCAL, assignment.getType()); // NODE_LOCAL - node_1 assignment = a.assignContainers(clusterResource, node_1, new ResourceLimits(clusterResource), SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY); - verify(app_0).allocate(eq(NodeType.NODE_LOCAL), eq(node_1), - any(Priority.class), any(ResourceRequest.class), any(Container.class)); + verifyContainerAllocated(assignment, NodeType.NODE_LOCAL); assertEquals(0, app_0.getSchedulingOpportunities(priority)); // should reset assertEquals(0, app_0.getTotalRequiredResources(priority)); assertEquals(NodeType.NODE_LOCAL, assignment.getType()); @@ -1638,16 +1643,13 @@ public class TestLeafQueue { new ResourceLimits(clusterResource), SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY); assertEquals(1, app_0.getSchedulingOpportunities(priority)); assertEquals(2, app_0.getTotalRequiredResources(priority)); - assertEquals(NodeType.NODE_LOCAL, assignment.getType()); // None->NODE_LOCAL // Should assign RACK_LOCAL now assignment = a.assignContainers(clusterResource, node_3, new ResourceLimits(clusterResource), SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY); - verify(app_0).allocate(eq(NodeType.RACK_LOCAL), eq(node_3), - any(Priority.class), any(ResourceRequest.class), any(Container.class)); + verifyContainerAllocated(assignment, NodeType.RACK_LOCAL); assertEquals(0, app_0.getSchedulingOpportunities(priority)); // should reset assertEquals(1, app_0.getTotalRequiredResources(priority)); - assertEquals(NodeType.RACK_LOCAL, assignment.getType()); } @Test @@ -1661,9 +1663,9 @@ public class TestLeafQueue { // Submit applications final ApplicationAttemptId appAttemptId_0 = TestUtils.getMockApplicationAttemptId(0, 0); - FiCaSchedulerApp app_0 = - spy(new FiCaSchedulerApp(appAttemptId_0, user_0, a, - mock(ActiveUsersManager.class), spyRMContext)); + FiCaSchedulerApp app_0 = + new FiCaSchedulerApp(appAttemptId_0, user_0, a, + mock(ActiveUsersManager.class), spyRMContext); a.submitApplicationAttempt(app_0, user_0); // Setup some nodes and racks @@ -1723,63 +1725,48 @@ public class TestLeafQueue { // Start with off switch, shouldn't allocate P1 due to delay scheduling // thus, no P2 either! - a.assignContainers(clusterResource, node_2, + CSAssignment assignment = a.assignContainers(clusterResource, node_2, new ResourceLimits(clusterResource), SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY); - verify(app_0, never()).allocate(any(NodeType.class), eq(node_2), - eq(priority_1), any(ResourceRequest.class), any(Container.class)); + verifyNoContainerAllocated(assignment); assertEquals(1, app_0.getSchedulingOpportunities(priority_1)); assertEquals(2, app_0.getTotalRequiredResources(priority_1)); - verify(app_0, never()).allocate(any(NodeType.class), eq(node_2), - eq(priority_2), any(ResourceRequest.class), any(Container.class)); assertEquals(0, app_0.getSchedulingOpportunities(priority_2)); assertEquals(1, app_0.getTotalRequiredResources(priority_2)); // Another off-switch, shouldn't allocate P1 due to delay scheduling // thus, no P2 either! - a.assignContainers(clusterResource, node_2, + assignment = a.assignContainers(clusterResource, node_2, new ResourceLimits(clusterResource), SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY); - verify(app_0, never()).allocate(any(NodeType.class), eq(node_2), - eq(priority_1), any(ResourceRequest.class), any(Container.class)); + verifyNoContainerAllocated(assignment); assertEquals(2, app_0.getSchedulingOpportunities(priority_1)); assertEquals(2, app_0.getTotalRequiredResources(priority_1)); - verify(app_0, never()).allocate(any(NodeType.class), eq(node_2), - eq(priority_2), any(ResourceRequest.class), any(Container.class)); assertEquals(0, app_0.getSchedulingOpportunities(priority_2)); assertEquals(1, app_0.getTotalRequiredResources(priority_2)); // Another off-switch, shouldn't allocate OFF_SWITCH P1 - a.assignContainers(clusterResource, node_2, + assignment = a.assignContainers(clusterResource, node_2, new ResourceLimits(clusterResource), SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY); - verify(app_0).allocate(eq(NodeType.OFF_SWITCH), eq(node_2), - eq(priority_1), any(ResourceRequest.class), any(Container.class)); + verifyContainerAllocated(assignment, NodeType.OFF_SWITCH); assertEquals(3, app_0.getSchedulingOpportunities(priority_1)); assertEquals(1, app_0.getTotalRequiredResources(priority_1)); - verify(app_0, never()).allocate(any(NodeType.class), eq(node_2), - eq(priority_2), any(ResourceRequest.class), any(Container.class)); assertEquals(0, app_0.getSchedulingOpportunities(priority_2)); assertEquals(1, app_0.getTotalRequiredResources(priority_2)); // Now, DATA_LOCAL for P1 - a.assignContainers(clusterResource, node_0, + assignment = a.assignContainers(clusterResource, node_0, new ResourceLimits(clusterResource), SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY); - verify(app_0).allocate(eq(NodeType.NODE_LOCAL), eq(node_0), - eq(priority_1), any(ResourceRequest.class), any(Container.class)); + verifyContainerAllocated(assignment, NodeType.NODE_LOCAL); assertEquals(0, app_0.getSchedulingOpportunities(priority_1)); assertEquals(0, app_0.getTotalRequiredResources(priority_1)); - verify(app_0, never()).allocate(any(NodeType.class), eq(node_0), - eq(priority_2), any(ResourceRequest.class), any(Container.class)); assertEquals(0, app_0.getSchedulingOpportunities(priority_2)); assertEquals(1, app_0.getTotalRequiredResources(priority_2)); // Now, OFF_SWITCH for P2 - a.assignContainers(clusterResource, node_1, + assignment = a.assignContainers(clusterResource, node_1, new ResourceLimits(clusterResource), SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY); - verify(app_0, never()).allocate(any(NodeType.class), eq(node_1), - eq(priority_1), any(ResourceRequest.class), any(Container.class)); + verifyContainerAllocated(assignment, NodeType.OFF_SWITCH); assertEquals(0, app_0.getSchedulingOpportunities(priority_1)); assertEquals(0, app_0.getTotalRequiredResources(priority_1)); - verify(app_0).allocate(eq(NodeType.OFF_SWITCH), eq(node_1), - eq(priority_2), any(ResourceRequest.class), any(Container.class)); assertEquals(1, app_0.getSchedulingOpportunities(priority_2)); assertEquals(0, app_0.getTotalRequiredResources(priority_2)); @@ -1798,8 +1785,8 @@ public class TestLeafQueue { final ApplicationAttemptId appAttemptId_0 = TestUtils.getMockApplicationAttemptId(0, 0); FiCaSchedulerApp app_0 = - spy(new FiCaSchedulerApp(appAttemptId_0, user_0, a, - mock(ActiveUsersManager.class), spyRMContext)); + new FiCaSchedulerApp(appAttemptId_0, user_0, a, + mock(ActiveUsersManager.class), spyRMContext); a.submitApplicationAttempt(app_0, user_0); // Setup some nodes and racks @@ -1849,19 +1836,17 @@ public class TestLeafQueue { app_0.updateResourceRequests(app_0_requests_0); // NODE_LOCAL - node_0_1 - a.assignContainers(clusterResource, node_0_0, + CSAssignment assignment = a.assignContainers(clusterResource, node_0_0, new ResourceLimits(clusterResource), SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY); - verify(app_0).allocate(eq(NodeType.NODE_LOCAL), eq(node_0_0), - any(Priority.class), any(ResourceRequest.class), any(Container.class)); + verifyContainerAllocated(assignment, NodeType.NODE_LOCAL); assertEquals(0, app_0.getSchedulingOpportunities(priority)); // should reset assertEquals(0, app_0.getTotalRequiredResources(priority)); // No allocation on node_1_0 even though it's node/rack local since // required(ANY) == 0 - a.assignContainers(clusterResource, node_1_0, + assignment = a.assignContainers(clusterResource, node_1_0, new ResourceLimits(clusterResource), SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY); - verify(app_0, never()).allocate(any(NodeType.class), eq(node_1_0), - any(Priority.class), any(ResourceRequest.class), any(Container.class)); + verifyNoContainerAllocated(assignment); assertEquals(0, app_0.getSchedulingOpportunities(priority)); // Still zero // since #req=0 assertEquals(0, app_0.getTotalRequiredResources(priority)); @@ -1875,21 +1860,18 @@ public class TestLeafQueue { // No allocation on node_0_1 even though it's node/rack local since // required(rack_1) == 0 - a.assignContainers(clusterResource, node_0_1, + assignment = a.assignContainers(clusterResource, node_0_1, new ResourceLimits(clusterResource), SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY); - verify(app_0, never()).allocate(any(NodeType.class), eq(node_1_0), - any(Priority.class), any(ResourceRequest.class), any(Container.class)); + verifyNoContainerAllocated(assignment); assertEquals(1, app_0.getSchedulingOpportunities(priority)); assertEquals(1, app_0.getTotalRequiredResources(priority)); // NODE_LOCAL - node_1 - a.assignContainers(clusterResource, node_1_0, + assignment = a.assignContainers(clusterResource, node_1_0, new ResourceLimits(clusterResource), SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY); - verify(app_0).allocate(eq(NodeType.NODE_LOCAL), eq(node_1_0), - any(Priority.class), any(ResourceRequest.class), any(Container.class)); + verifyContainerAllocated(assignment, NodeType.NODE_LOCAL); assertEquals(0, app_0.getSchedulingOpportunities(priority)); // should reset assertEquals(0, app_0.getTotalRequiredResources(priority)); - } @Test (timeout = 30000) @@ -2067,16 +2049,16 @@ public class TestLeafQueue { // Submit applications final ApplicationAttemptId appAttemptId_0 = TestUtils.getMockApplicationAttemptId(0, 0); - FiCaSchedulerApp app_0 = - spy(new FiCaSchedulerApp(appAttemptId_0, user_0, a, - mock(ActiveUsersManager.class), spyRMContext)); + FiCaSchedulerApp app_0 = + new FiCaSchedulerApp(appAttemptId_0, user_0, a, + mock(ActiveUsersManager.class), spyRMContext); a.submitApplicationAttempt(app_0, user_0); final ApplicationAttemptId appAttemptId_1 = TestUtils.getMockApplicationAttemptId(1, 0); - FiCaSchedulerApp app_1 = - spy(new FiCaSchedulerApp(appAttemptId_1, user_0, a, - mock(ActiveUsersManager.class), spyRMContext)); + FiCaSchedulerApp app_1 = + new FiCaSchedulerApp(appAttemptId_1, user_0, a, + mock(ActiveUsersManager.class), spyRMContext); a.submitApplicationAttempt(app_1, user_0); // Setup some nodes and racks @@ -2136,10 +2118,10 @@ public class TestLeafQueue { // node_0_1 // Shouldn't allocate since RR(rack_0) = null && RR(ANY) = relax: false - a.assignContainers(clusterResource, node_0_1, - new ResourceLimits(clusterResource), SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY); - verify(app_0, never()).allocate(any(NodeType.class), eq(node_0_1), - any(Priority.class), any(ResourceRequest.class), any(Container.class)); + CSAssignment assignment = + a.assignContainers(clusterResource, node_0_1, new ResourceLimits( + clusterResource), SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY); + verifyNoContainerAllocated(assignment); assertEquals(0, app_0.getSchedulingOpportunities(priority)); // should be 0 // resourceName: @@ -2159,10 +2141,9 @@ public class TestLeafQueue { // node_1_1 // Shouldn't allocate since RR(rack_1) = relax: false - a.assignContainers(clusterResource, node_1_1, + assignment = a.assignContainers(clusterResource, node_1_1, new ResourceLimits(clusterResource), SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY); - verify(app_0, never()).allocate(any(NodeType.class), eq(node_0_1), - any(Priority.class), any(ResourceRequest.class), any(Container.class)); + verifyNoContainerAllocated(assignment); assertEquals(0, app_0.getSchedulingOpportunities(priority)); // should be 0 // Allow rack-locality for rack_1, but blacklist node_1_1 @@ -2190,10 +2171,9 @@ public class TestLeafQueue { // node_1_1 // Shouldn't allocate since node_1_1 is blacklisted - a.assignContainers(clusterResource, node_1_1, + assignment = a.assignContainers(clusterResource, node_1_1, new ResourceLimits(clusterResource), SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY); - verify(app_0, never()).allocate(any(NodeType.class), eq(node_1_1), - any(Priority.class), any(ResourceRequest.class), any(Container.class)); + verifyNoContainerAllocated(assignment); assertEquals(0, app_0.getSchedulingOpportunities(priority)); // should be 0 // Now, remove node_1_1 from blacklist, but add rack_1 to blacklist @@ -2219,10 +2199,9 @@ public class TestLeafQueue { // node_1_1 // Shouldn't allocate since rack_1 is blacklisted - a.assignContainers(clusterResource, node_1_1, + assignment = a.assignContainers(clusterResource, node_1_1, new ResourceLimits(clusterResource), SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY); - verify(app_0, never()).allocate(any(NodeType.class), eq(node_1_1), - any(Priority.class), any(ResourceRequest.class), any(Container.class)); + verifyNoContainerAllocated(assignment); assertEquals(0, app_0.getSchedulingOpportunities(priority)); // should be 0 // Now remove rack_1 from blacklist @@ -2246,10 +2225,9 @@ public class TestLeafQueue { // Blacklist: < host_0_0 > <---- // Now, should allocate since RR(rack_1) = relax: true - a.assignContainers(clusterResource, node_1_1, + assignment = a.assignContainers(clusterResource, node_1_1, new ResourceLimits(clusterResource), SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY); - verify(app_0,never()).allocate(eq(NodeType.RACK_LOCAL), eq(node_1_1), - any(Priority.class), any(ResourceRequest.class), any(Container.class)); + verifyNoContainerAllocated(assignment); assertEquals(0, app_0.getSchedulingOpportunities(priority)); assertEquals(1, app_0.getTotalRequiredResources(priority)); @@ -2277,10 +2255,9 @@ public class TestLeafQueue { // host_1_0: 8G // host_1_1: 7G - a.assignContainers(clusterResource, node_1_0, + assignment = a.assignContainers(clusterResource, node_1_0, new ResourceLimits(clusterResource), SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY); - verify(app_0).allocate(eq(NodeType.NODE_LOCAL), eq(node_1_0), - any(Priority.class), any(ResourceRequest.class), any(Container.class)); + verifyContainerAllocated(assignment, NodeType.NODE_LOCAL); assertEquals(0, app_0.getSchedulingOpportunities(priority)); assertEquals(0, app_0.getTotalRequiredResources(priority));