From 65e7ae5dcf5b5141446b28cfc6a0f828f4eb833a Mon Sep 17 00:00:00 2001 From: Sunil G Date: Fri, 6 Jan 2017 21:30:52 +0530 Subject: [PATCH] YARN-5906. Update AppSchedulingInfo to use SchedulingPlacementSet. Contributed by Wangda Tan. --- .../scheduler/AppSchedulingInfo.java | 425 +++++------------- .../LocalitySchedulingPlacementSet.java | 311 +++++++++++++ .../placement/SchedulingPlacementSet.java | 22 +- .../TestApplicationLimitsByPartition.java | 6 + 4 files changed, 444 insertions(+), 320 deletions(-) create mode 100644 hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/placement/LocalitySchedulingPlacementSet.java diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AppSchedulingInfo.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AppSchedulingInfo.java index 80811b14b3d..b9deb6cdab2 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AppSchedulingInfo.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AppSchedulingInfo.java @@ -18,22 +18,6 @@ package org.apache.hadoop.yarn.server.resourcemanager.scheduler; -import java.util.ArrayList; -import java.util.Collection; -import java.util.HashMap; -import java.util.HashSet; -import java.util.Iterator; -import java.util.List; -import java.util.Map; -import java.util.Set; -import java.util.TreeMap; -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.ConcurrentSkipListMap; -import java.util.concurrent.atomic.AtomicBoolean; -import java.util.concurrent.atomic.AtomicLong; -import java.util.concurrent.locks.ReentrantReadWriteLock; - -import org.apache.commons.collections.IteratorUtils; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.classification.InterfaceAudience.Private; @@ -48,14 +32,28 @@ import org.apache.hadoop.yarn.api.records.ResourceRequest; import org.apache.hadoop.yarn.exceptions.YarnException; import org.apache.hadoop.yarn.server.resourcemanager.RMServerUtils; import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager; -import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager; import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer; import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerState; -import org.apache.hadoop.yarn.server.resourcemanager.scheduler.placement.PlacementSet; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.placement.LocalitySchedulingPlacementSet; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.placement.ResourceRequestUpdateResult; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.placement.SchedulingPlacementSet; import org.apache.hadoop.yarn.util.resource.Resources; +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.TreeMap; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentSkipListMap; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicLong; +import java.util.concurrent.locks.ReentrantReadWriteLock; + /** * This class keeps track of all the consumption of an application. This also * keeps track of current running/completed containers for the application. @@ -87,8 +85,8 @@ public class AppSchedulingInfo { private final ConcurrentSkipListMap schedulerKeys = new ConcurrentSkipListMap<>(); - final Map> - resourceRequestMap = new ConcurrentHashMap<>(); + final Map> + schedulerKeyToPlacementSets = new ConcurrentHashMap<>(); final Map>> containerIncreaseRequestMap = new ConcurrentHashMap<>(); @@ -151,7 +149,7 @@ public class AppSchedulingInfo { */ private void clearRequests() { schedulerKeys.clear(); - resourceRequestMap.clear(); + schedulerKeyToPlacementSets.clear(); LOG.info("Application " + applicationId + " requests cleared"); } @@ -297,7 +295,7 @@ public class AppSchedulingInfo { } } - private void decrementSchedulerKeyReference( + public void decrementSchedulerKeyReference( SchedulerRequestKey schedulerKey) { Integer schedulerKeyCount = schedulerKeys.get(schedulerKey); if (schedulerKeyCount != null) { @@ -389,49 +387,56 @@ public class AppSchedulingInfo { */ public boolean updateResourceRequests(List requests, boolean recoverPreemptedRequestForAContainer) { + if (null == requests || requests.isEmpty()) { + return false; + } + // Flag to track if any incoming requests update "ANY" requests - boolean anyResourcesUpdated = false; + boolean offswitchResourcesUpdated = false; try { this.writeLock.lock(); - // Update resource requests + + // A map to group resource requests and dedup + Map> dedupRequests = + new HashMap<>(); + + // Group resource request by schedulerRequestKey and resourceName for (ResourceRequest request : requests) { SchedulerRequestKey schedulerKey = SchedulerRequestKey.create(request); - String resourceName = request.getResourceName(); + if (!dedupRequests.containsKey(schedulerKey)) { + dedupRequests.put(schedulerKey, + new HashMap()); + } + dedupRequests.get(schedulerKey).put(request.getResourceName(), request); + } - // Update node labels if required - updateNodeLabels(request); + // Update scheduling placement set + for (Map.Entry> entry : dedupRequests.entrySet()) { + SchedulerRequestKey schedulerRequestKey = entry.getKey(); - Map asks = - this.resourceRequestMap.get(schedulerKey); - if (asks == null) { - asks = new ConcurrentHashMap<>(); - this.resourceRequestMap.put(schedulerKey, asks); + if (!schedulerKeyToPlacementSets.containsKey(schedulerRequestKey)) { + schedulerKeyToPlacementSets.put(schedulerRequestKey, + new LocalitySchedulingPlacementSet<>(this)); } - // Increment number of containers if recovering preempted resources - ResourceRequest lastRequest = asks.get(resourceName); - if (recoverPreemptedRequestForAContainer && lastRequest != null) { - request.setNumContainers(lastRequest.getNumContainers() + 1); - } + // Update placement set + ResourceRequestUpdateResult pendingAmountChanges = + schedulerKeyToPlacementSets.get(schedulerRequestKey) + .updateResourceRequests( + entry.getValue().values(), + recoverPreemptedRequestForAContainer); - // Update asks - asks.put(resourceName, request); - - if (resourceName.equals(ResourceRequest.ANY)) { - //update the applications requested labels set - requestedPartitions.add(request.getNodeLabelExpression() == null - ? RMNodeLabelsManager.NO_LABEL : - request.getNodeLabelExpression()); - - anyResourcesUpdated = true; - - // Update pendingResources - updatePendingResources(lastRequest, request, schedulerKey, + if (null != pendingAmountChanges) { + updatePendingResources( + pendingAmountChanges.getLastAnyResourceRequest(), + pendingAmountChanges.getNewResourceRequest(), schedulerRequestKey, queue.getMetrics()); + offswitchResourcesUpdated = true; } } - return anyResourcesUpdated; + + return offswitchResourcesUpdated; } finally { this.writeLock.unlock(); } @@ -481,35 +486,13 @@ public class AppSchedulingInfo { } } - private void updateNodeLabels(ResourceRequest request) { - SchedulerRequestKey schedulerKey = SchedulerRequestKey.create(request); - String resourceName = request.getResourceName(); - if (resourceName.equals(ResourceRequest.ANY)) { - ResourceRequest previousAnyRequest = - getResourceRequest(schedulerKey, resourceName); + public void addRequestedPartition(String partition) { + requestedPartitions.add(partition); + } - // When there is change in ANY request label expression, we should - // update label for all resource requests already added of same - // priority as ANY resource request. - if ((null == previousAnyRequest) - || hasRequestLabelChanged(previousAnyRequest, request)) { - Map resourceRequest = - getResourceRequests(schedulerKey); - if (resourceRequest != null) { - for (ResourceRequest r : resourceRequest.values()) { - if (!r.getResourceName().equals(ResourceRequest.ANY)) { - r.setNodeLabelExpression(request.getNodeLabelExpression()); - } - } - } - } - } else { - ResourceRequest anyRequest = - getResourceRequest(schedulerKey, ResourceRequest.ANY); - if (anyRequest != null) { - request.setNodeLabelExpression(anyRequest.getNodeLabelExpression()); - } - } + public void decPendingResource(String partition, Resource toDecrease) { + queue.decPendingResource(partition, toDecrease); + appResourceUsage.decPending(partition, toDecrease); } private boolean hasRequestLabelChanged(ResourceRequest requestOne, @@ -582,17 +565,22 @@ public class AppSchedulingInfo { return schedulerKeys.keySet(); } + @SuppressWarnings("unchecked") public Map getResourceRequests( SchedulerRequestKey schedulerKey) { - return resourceRequestMap.get(schedulerKey); + SchedulingPlacementSet ps = schedulerKeyToPlacementSets.get(schedulerKey); + if (null != ps) { + return ps.getResourceRequests(); + } + return Collections.emptyMap(); } public List getAllResourceRequests() { List ret = new ArrayList<>(); try { this.readLock.lock(); - for (Map r : resourceRequestMap.values()) { - ret.addAll(r.values()); + for (SchedulingPlacementSet ps : schedulerKeyToPlacementSets.values()) { + ret.addAll(ps.getResourceRequests().values()); } } finally { this.readLock.unlock(); @@ -604,9 +592,9 @@ public class AppSchedulingInfo { String resourceName) { try { this.readLock.lock(); - Map nodeRequests = - resourceRequestMap.get(schedulerKey); - return (nodeRequests == null) ? null : nodeRequests.get(resourceName); + SchedulingPlacementSet ps = + schedulerKeyToPlacementSets.get(schedulerKey); + return (ps == null) ? null : ps.getResourceRequest(resourceName); } finally { this.readLock.unlock(); } @@ -698,141 +686,29 @@ public class AppSchedulingInfo { public List allocate(NodeType type, SchedulerNode node, SchedulerRequestKey schedulerKey, + ResourceRequest request, Container containerAllocated) { try { writeLock.lock(); - ResourceRequest request; - if (type == NodeType.NODE_LOCAL) { - request = resourceRequestMap.get(schedulerKey).get(node.getNodeName()); - } else if (type == NodeType.RACK_LOCAL) { - request = resourceRequestMap.get(schedulerKey).get(node.getRackName()); - } else{ - request = resourceRequestMap.get(schedulerKey).get(ResourceRequest.ANY); - } - return allocate(type, node, schedulerKey, request, containerAllocated); - } finally { - writeLock.unlock(); - } - } - - /** - * Resources have been allocated to this application by the resource - * scheduler. Track them. - * @param type Node Type - * @param node SchedulerNode - * @param schedulerKey SchedulerRequestKey - * @param request ResourceRequest - * @param containerAllocated Container Allocated - * @return List of ResourceRequests - */ - public List allocate(NodeType type, - SchedulerNode node, SchedulerRequestKey schedulerKey, - ResourceRequest request, Container containerAllocated) { - try { - writeLock.lock(); - List resourceRequests = new ArrayList<>(); - if (type == NodeType.NODE_LOCAL) { - allocateNodeLocal(node, schedulerKey, request, resourceRequests); - } else if (type == NodeType.RACK_LOCAL) { - allocateRackLocal(node, schedulerKey, request, resourceRequests); - } else{ - allocateOffSwitch(request, resourceRequests, schedulerKey); - } if (null != containerAllocated) { - updateMetricsForAllocatedContainer(request, type, containerAllocated); + updateMetricsForAllocatedContainer(type, containerAllocated); } - return resourceRequests; + + return schedulerKeyToPlacementSets.get(schedulerKey).allocate(type, node, + request); } finally { writeLock.unlock(); } } - /** - * The {@link ResourceScheduler} is allocating data-local resources to the - * application. - */ - private void allocateNodeLocal(SchedulerNode node, - SchedulerRequestKey schedulerKey, ResourceRequest nodeLocalRequest, - List resourceRequests) { - // Update future requirements - decResourceRequest(node.getNodeName(), schedulerKey, nodeLocalRequest); - - ResourceRequest rackLocalRequest = resourceRequestMap.get(schedulerKey).get( - node.getRackName()); - decResourceRequest(node.getRackName(), schedulerKey, rackLocalRequest); - - ResourceRequest offRackRequest = resourceRequestMap.get(schedulerKey).get( - ResourceRequest.ANY); - decrementOutstanding(offRackRequest, schedulerKey); - - // Update cloned NodeLocal, RackLocal and OffRack requests for recovery - resourceRequests.add(cloneResourceRequest(nodeLocalRequest)); - resourceRequests.add(cloneResourceRequest(rackLocalRequest)); - resourceRequests.add(cloneResourceRequest(offRackRequest)); + public List allocate(NodeType type, + SchedulerNode node, SchedulerRequestKey schedulerKey, + Container containerAllocated) { + return allocate(type, node, schedulerKey, null, containerAllocated); } - private void decResourceRequest(String resourceName, - SchedulerRequestKey schedulerKey, ResourceRequest request) { - request.setNumContainers(request.getNumContainers() - 1); - if (request.getNumContainers() == 0) { - resourceRequestMap.get(schedulerKey).remove(resourceName); - } - } - - /** - * The {@link ResourceScheduler} is allocating data-local resources to the - * application. - */ - private void allocateRackLocal(SchedulerNode node, - SchedulerRequestKey schedulerKey, ResourceRequest rackLocalRequest, - List resourceRequests) { - // Update future requirements - decResourceRequest(node.getRackName(), schedulerKey, rackLocalRequest); - - ResourceRequest offRackRequest = resourceRequestMap.get(schedulerKey).get( - ResourceRequest.ANY); - decrementOutstanding(offRackRequest, schedulerKey); - - // Update cloned RackLocal and OffRack requests for recovery - resourceRequests.add(cloneResourceRequest(rackLocalRequest)); - resourceRequests.add(cloneResourceRequest(offRackRequest)); - } - - /** - * The {@link ResourceScheduler} is allocating data-local resources to the - * application. - */ - private void allocateOffSwitch(ResourceRequest offSwitchRequest, - List resourceRequests, - SchedulerRequestKey schedulerKey) { - // Update future requirements - decrementOutstanding(offSwitchRequest, schedulerKey); - // Update cloned OffRack requests for recovery - resourceRequests.add(cloneResourceRequest(offSwitchRequest)); - } - - private void decrementOutstanding(ResourceRequest offSwitchRequest, - SchedulerRequestKey schedulerKey) { - int numOffSwitchContainers = offSwitchRequest.getNumContainers() - 1; - - // Do not remove ANY - offSwitchRequest.setNumContainers(numOffSwitchContainers); - - // Do we have any outstanding requests? - // If there is nothing, we need to deactivate this application - if (numOffSwitchContainers == 0) { - decrementSchedulerKeyReference(schedulerKey); - checkForDeactivation(); - } - - appResourceUsage.decPending(offSwitchRequest.getNodeLabelExpression(), - offSwitchRequest.getCapability()); - queue.decPendingResource(offSwitchRequest.getNodeLabelExpression(), - offSwitchRequest.getCapability()); - } - - private void checkForDeactivation() { + public void checkForDeactivation() { if (schedulerKeys.isEmpty()) { activeUsersManager.deactivateApplication(user, applicationId); } @@ -843,9 +719,9 @@ public class AppSchedulingInfo { this.writeLock.lock(); QueueMetrics oldMetrics = queue.getMetrics(); QueueMetrics newMetrics = newQueue.getMetrics(); - for (Map asks : resourceRequestMap.values()) { - ResourceRequest request = asks.get(ResourceRequest.ANY); - if (request != null) { + for (SchedulingPlacementSet ps : schedulerKeyToPlacementSets.values()) { + ResourceRequest request = ps.getResourceRequest(ResourceRequest.ANY); + if (request != null && request.getNumContainers() > 0) { oldMetrics.decrPendingResources(user, request.getNumContainers(), request.getCapability()); newMetrics.incrPendingResources(user, request.getNumContainers(), @@ -874,9 +750,9 @@ public class AppSchedulingInfo { try { this.writeLock.lock(); QueueMetrics metrics = queue.getMetrics(); - for (Map asks : resourceRequestMap.values()) { - ResourceRequest request = asks.get(ResourceRequest.ANY); - if (request != null) { + for (SchedulingPlacementSet ps : schedulerKeyToPlacementSets.values()) { + ResourceRequest request = ps.getResourceRequest(ResourceRequest.ANY); + if (request != null && request.getNumContainers() > 0) { metrics.decrPendingResources(user, request.getNumContainers(), request.getCapability()); @@ -945,17 +821,6 @@ public class AppSchedulingInfo { } } - public ResourceRequest cloneResourceRequest(ResourceRequest request) { - ResourceRequest newRequest = ResourceRequest.newBuilder() - .priority(request.getPriority()) - .resourceName(request.getResourceName()) - .capability(request.getCapability()) - .numContainers(1) - .relaxLocality(request.getRelaxLocality()) - .nodeLabelExpression(request.getNodeLabelExpression()).build(); - return newRequest; - } - /* * In async environment, pending resource request could be updated during * scheduling, this method checks pending request before allocating @@ -964,107 +829,43 @@ public class AppSchedulingInfo { SchedulerRequestKey schedulerKey) { try { readLock.lock(); - ResourceRequest r = resourceRequestMap.get(schedulerKey).get( - ResourceRequest.ANY); - if (r == null || r.getNumContainers() <= 0) { + SchedulingPlacementSet ps = schedulerKeyToPlacementSets.get(schedulerKey); + if (null == ps) { return false; } - if (type == NodeType.RACK_LOCAL || type == NodeType.NODE_LOCAL) { - r = resourceRequestMap.get(schedulerKey).get(node.getRackName()); - if (r == null || r.getNumContainers() <= 0) { - return false; - } - if (type == NodeType.NODE_LOCAL) { - r = resourceRequestMap.get(schedulerKey).get(node.getNodeName()); - if (r == null || r.getNumContainers() <= 0) { - return false; - } - } - } - - return true; + return ps.canAllocate(type, node); } finally { readLock.unlock(); } } - public void updateMetricsForAllocatedContainer( - ResourceRequest request, NodeType type, Container containerAllocated) { - try { - writeLock.lock(); - QueueMetrics metrics = queue.getMetrics(); - if (pending) { - // once an allocation is done we assume the application is - // running from scheduler's POV. - pending = false; - metrics.runAppAttempt(applicationId, user); - } - - if (LOG.isDebugEnabled()) { - LOG.debug("allocate: applicationId=" + applicationId + " container=" - + containerAllocated.getId() + " host=" + containerAllocated - .getNodeId().toString() + " user=" + user + " resource=" + request - .getCapability() + " type=" + type); - } - metrics.allocateResources(user, 1, request.getCapability(), true); - metrics.incrNodeTypeAggregations(user, type); - } finally { - writeLock.unlock(); + private void updateMetricsForAllocatedContainer( + NodeType type, Container containerAllocated) { + QueueMetrics metrics = queue.getMetrics(); + if (pending) { + // once an allocation is done we assume the application is + // running from scheduler's POV. + pending = false; + metrics.runAppAttempt(applicationId, user); } + + if (LOG.isDebugEnabled()) { + LOG.debug("allocate: applicationId=" + applicationId + " container=" + + containerAllocated.getId() + " host=" + containerAllocated + .getNodeId().toString() + " user=" + user + " resource=" + + containerAllocated.getResource() + " type=" + + type); + } + metrics.allocateResources(user, 1, containerAllocated.getResource(), + true); + metrics.incrNodeTypeAggregations(user, type); } // Get placement-set by specified schedulerKey // Now simply return all node of the input clusterPlacementSet - // TODO, need update this when we support global scheduling public SchedulingPlacementSet getSchedulingPlacementSet( SchedulerRequestKey schedulerkey) { - return new SchedulingPlacementSet() { - @Override - @SuppressWarnings("unchecked") - public Iterator getPreferredNodeIterator( - PlacementSet clusterPlacementSet) { - return IteratorUtils.singletonIterator( - clusterPlacementSet.getAllNodes().values().iterator().next()); - } - - @Override - public ResourceRequestUpdateResult updateResourceRequests( - List requests, - boolean recoverPreemptedRequestForAContainer) { - return null; - } - - @Override - public Map getResourceRequests() { - return null; - } - - @Override - public ResourceRequest getResourceRequest(String resourceName, - SchedulerRequestKey requestKey) { - return null; - } - - @Override - public List allocate(NodeType type, SchedulerNode node, - ResourceRequest request) { - return null; - } - - @Override - public Map getAllNodes() { - return null; - } - - @Override - public long getVersion() { - return 0; - } - - @Override - public String getPartition() { - return null; - } - }; + return (SchedulingPlacementSet) schedulerKeyToPlacementSets.get( + schedulerkey); } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/placement/LocalitySchedulingPlacementSet.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/placement/LocalitySchedulingPlacementSet.java new file mode 100644 index 00000000000..ffaad58f2e7 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/placement/LocalitySchedulingPlacementSet.java @@ -0,0 +1,311 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.resourcemanager.scheduler.placement; + +import org.apache.commons.collections.IteratorUtils; +import org.apache.hadoop.yarn.api.records.ResourceRequest; +import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.AppSchedulingInfo; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.NodeType; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNode; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerRequestKey; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.locks.ReentrantReadWriteLock; + +public class LocalitySchedulingPlacementSet + implements SchedulingPlacementSet { + private final Map resourceRequestMap = + new ConcurrentHashMap<>(); + private AppSchedulingInfo appSchedulingInfo; + + private final ReentrantReadWriteLock.ReadLock readLock; + private final ReentrantReadWriteLock.WriteLock writeLock; + + public LocalitySchedulingPlacementSet(AppSchedulingInfo info) { + ReentrantReadWriteLock lock = new ReentrantReadWriteLock(); + readLock = lock.readLock(); + writeLock = lock.writeLock(); + this.appSchedulingInfo = info; + } + + @Override + @SuppressWarnings("unchecked") + public Iterator getPreferredNodeIterator( + PlacementSet clusterPlacementSet) { + // Now only handle the case that single node in placementSet + // TODO, Add support to multi-hosts inside placement-set which is passed in. + + N singleNode = PlacementSetUtils.getSingleNode(clusterPlacementSet); + if (null != singleNode) { + return IteratorUtils.singletonIterator(singleNode); + } + + return IteratorUtils.emptyIterator(); + } + + private boolean hasRequestLabelChanged(ResourceRequest requestOne, + ResourceRequest requestTwo) { + String requestOneLabelExp = requestOne.getNodeLabelExpression(); + String requestTwoLabelExp = requestTwo.getNodeLabelExpression(); + // First request label expression can be null and second request + // is not null then we have to consider it as changed. + if ((null == requestOneLabelExp) && (null != requestTwoLabelExp)) { + return true; + } + // If the label is not matching between both request when + // requestOneLabelExp is not null. + return ((null != requestOneLabelExp) && !(requestOneLabelExp + .equals(requestTwoLabelExp))); + } + + private void updateNodeLabels(ResourceRequest request) { + String resourceName = request.getResourceName(); + if (resourceName.equals(ResourceRequest.ANY)) { + ResourceRequest previousAnyRequest = + getResourceRequest(resourceName); + + // When there is change in ANY request label expression, we should + // update label for all resource requests already added of same + // priority as ANY resource request. + if ((null == previousAnyRequest) || hasRequestLabelChanged( + previousAnyRequest, request)) { + for (ResourceRequest r : resourceRequestMap.values()) { + if (!r.getResourceName().equals(ResourceRequest.ANY)) { + r.setNodeLabelExpression(request.getNodeLabelExpression()); + } + } + } + } else{ + ResourceRequest anyRequest = getResourceRequest(ResourceRequest.ANY); + if (anyRequest != null) { + request.setNodeLabelExpression(anyRequest.getNodeLabelExpression()); + } + } + } + + @Override + public ResourceRequestUpdateResult updateResourceRequests( + Collection requests, + boolean recoverPreemptedRequestForAContainer) { + try { + this.writeLock.lock(); + + ResourceRequestUpdateResult updateResult = null; + + // Update resource requests + for (ResourceRequest request : requests) { + String resourceName = request.getResourceName(); + + // Update node labels if required + updateNodeLabels(request); + + // Increment number of containers if recovering preempted resources + ResourceRequest lastRequest = resourceRequestMap.get(resourceName); + if (recoverPreemptedRequestForAContainer && lastRequest != null) { + request.setNumContainers(lastRequest.getNumContainers() + 1); + } + + // Update asks + resourceRequestMap.put(resourceName, request); + + if (resourceName.equals(ResourceRequest.ANY)) { + //update the applications requested labels set + appSchedulingInfo.addRequestedPartition( + request.getNodeLabelExpression() == null ? + RMNodeLabelsManager.NO_LABEL : + request.getNodeLabelExpression()); + + updateResult = new ResourceRequestUpdateResult(lastRequest, request); + } + } + return updateResult; + } finally { + this.writeLock.unlock(); + } + } + + @Override + public Map getResourceRequests() { + return resourceRequestMap; + } + + @Override + public ResourceRequest getResourceRequest(String resourceName) { + return resourceRequestMap.get(resourceName); + } + + private void decrementOutstanding(ResourceRequest offSwitchRequest) { + int numOffSwitchContainers = offSwitchRequest.getNumContainers() - 1; + + // Do not remove ANY + offSwitchRequest.setNumContainers(numOffSwitchContainers); + + // Do we have any outstanding requests? + // If there is nothing, we need to deactivate this application + if (numOffSwitchContainers == 0) { + SchedulerRequestKey schedulerRequestKey = SchedulerRequestKey.create( + offSwitchRequest); + appSchedulingInfo.decrementSchedulerKeyReference(schedulerRequestKey); + appSchedulingInfo.checkForDeactivation(); + } + + appSchedulingInfo.decPendingResource( + offSwitchRequest.getNodeLabelExpression(), + offSwitchRequest.getCapability()); + } + + private ResourceRequest cloneResourceRequest(ResourceRequest request) { + ResourceRequest newRequest = + ResourceRequest.newInstance(request.getPriority(), + request.getResourceName(), request.getCapability(), 1, + request.getRelaxLocality(), request.getNodeLabelExpression()); + return newRequest; + } + + /** + * The {@link ResourceScheduler} is allocating data-local resources to the + * application. + */ + private void allocateRackLocal(SchedulerNode node, + ResourceRequest rackLocalRequest, + List resourceRequests) { + // Update future requirements + decResourceRequest(node.getRackName(), rackLocalRequest); + + ResourceRequest offRackRequest = resourceRequestMap.get( + ResourceRequest.ANY); + decrementOutstanding(offRackRequest); + + // Update cloned RackLocal and OffRack requests for recovery + resourceRequests.add(cloneResourceRequest(rackLocalRequest)); + resourceRequests.add(cloneResourceRequest(offRackRequest)); + } + + /** + * The {@link ResourceScheduler} is allocating data-local resources to the + * application. + */ + private void allocateOffSwitch(ResourceRequest offSwitchRequest, + List resourceRequests) { + // Update future requirements + decrementOutstanding(offSwitchRequest); + // Update cloned OffRack requests for recovery + resourceRequests.add(cloneResourceRequest(offSwitchRequest)); + } + + + /** + * The {@link ResourceScheduler} is allocating data-local resources to the + * application. + */ + private void allocateNodeLocal(SchedulerNode node, + ResourceRequest nodeLocalRequest, + List resourceRequests) { + // Update future requirements + decResourceRequest(node.getNodeName(), nodeLocalRequest); + + ResourceRequest rackLocalRequest = resourceRequestMap.get( + node.getRackName()); + decResourceRequest(node.getRackName(), rackLocalRequest); + + ResourceRequest offRackRequest = resourceRequestMap.get( + ResourceRequest.ANY); + decrementOutstanding(offRackRequest); + + // Update cloned NodeLocal, RackLocal and OffRack requests for recovery + resourceRequests.add(cloneResourceRequest(nodeLocalRequest)); + resourceRequests.add(cloneResourceRequest(rackLocalRequest)); + resourceRequests.add(cloneResourceRequest(offRackRequest)); + } + + private void decResourceRequest(String resourceName, + ResourceRequest request) { + request.setNumContainers(request.getNumContainers() - 1); + if (request.getNumContainers() == 0) { + resourceRequestMap.remove(resourceName); + } + } + + @Override + public boolean canAllocate(NodeType type, SchedulerNode node) { + try { + readLock.lock(); + ResourceRequest r = resourceRequestMap.get( + ResourceRequest.ANY); + if (r == null || r.getNumContainers() <= 0) { + return false; + } + if (type == NodeType.RACK_LOCAL || type == NodeType.NODE_LOCAL) { + r = resourceRequestMap.get(node.getRackName()); + if (r == null || r.getNumContainers() <= 0) { + return false; + } + if (type == NodeType.NODE_LOCAL) { + r = resourceRequestMap.get(node.getNodeName()); + if (r == null || r.getNumContainers() <= 0) { + return false; + } + } + } + + return true; + } finally { + readLock.unlock(); + } + } + + @Override + public List allocate(NodeType type, SchedulerNode node, + ResourceRequest request) { + try { + writeLock.lock(); + + List resourceRequests = new ArrayList<>(); + + if (null == request) { + if (type == NodeType.NODE_LOCAL) { + request = resourceRequestMap.get(node.getNodeName()); + } else if (type == NodeType.RACK_LOCAL) { + request = resourceRequestMap.get(node.getRackName()); + } else{ + request = resourceRequestMap.get(ResourceRequest.ANY); + } + } + + if (type == NodeType.NODE_LOCAL) { + allocateNodeLocal(node, request, resourceRequests); + } else if (type == NodeType.RACK_LOCAL) { + allocateRackLocal(node, request, resourceRequests); + } else{ + allocateOffSwitch(request, resourceRequests); + } + + return resourceRequests; + } finally { + writeLock.unlock(); + } + } +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/placement/SchedulingPlacementSet.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/placement/SchedulingPlacementSet.java index f87f7647786..d78e710f16a 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/placement/SchedulingPlacementSet.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/placement/SchedulingPlacementSet.java @@ -23,13 +23,14 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.NodeType; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNode; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerRequestKey; +import java.util.Collection; import java.util.Iterator; import java.util.List; import java.util.Map; /** *

- * In addition to {@link PlacementSet}, this also maintains + * Comparing to {@link PlacementSet}, this also maintains * pending ResourceRequests: * - When new ResourceRequest(s) added to scheduler, or, * - Or new container allocated, scheduler can notify corresponding @@ -42,8 +43,7 @@ import java.util.Map; * can have different ways to order nodes depends on requests. *

*/ -public interface SchedulingPlacementSet - extends PlacementSet { +public interface SchedulingPlacementSet { /** * Get iterator of preferred node depends on requirement and/or availability * @param clusterPlacementSet input cluster PlacementSet @@ -60,7 +60,7 @@ public interface SchedulingPlacementSet * @return true if total pending resource changed */ ResourceRequestUpdateResult updateResourceRequests( - List requests, + Collection requests, boolean recoverPreemptedRequestForAContainer); /** @@ -72,19 +72,25 @@ public interface SchedulingPlacementSet /** * Get ResourceRequest by given schedulerKey and resourceName * @param resourceName resourceName - * @param schedulerRequestKey schedulerRequestKey * @return ResourceRequest */ - ResourceRequest getResourceRequest(String resourceName, - SchedulerRequestKey schedulerRequestKey); + ResourceRequest getResourceRequest(String resourceName); /** * Notify container allocated. * @param type Type of the allocation * @param node Which node this container allocated on - * @param request resource request + * @param request Which resource request to allocate * @return list of ResourceRequests deducted */ List allocate(NodeType type, SchedulerNode node, ResourceRequest request); + + /** + * We can still have pending requirement for a given NodeType and node + * @param type Locality Type + * @param node which node we will allocate on + * @return true if we has pending requirement + */ + boolean canAllocate(NodeType type, SchedulerNode node); } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestApplicationLimitsByPartition.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestApplicationLimitsByPartition.java index 5c53fda5604..1f87c533ff3 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestApplicationLimitsByPartition.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestApplicationLimitsByPartition.java @@ -687,6 +687,9 @@ public class TestApplicationLimitsByPartition { List app_0_1_requests = new ArrayList(); app_0_1_requests.add(TestUtils.createResourceRequest(ResourceRequest.ANY, 1 * GB, 2, true, priority_1, recordFactory)); + app_0_1.updateResourceRequests(app_0_1_requests); + + app_0_1_requests.clear(); app_0_1_requests.add(TestUtils.createResourceRequest(ResourceRequest.ANY, 1 * GB, 2, true, priority_1, recordFactory, "y")); app_0_1.updateResourceRequests(app_0_1_requests); @@ -715,6 +718,9 @@ public class TestApplicationLimitsByPartition { List app_1_0_requests = new ArrayList(); app_1_0_requests.add(TestUtils.createResourceRequest(ResourceRequest.ANY, 1 * GB, 2, true, priority_1, recordFactory)); + app_1_0.updateResourceRequests(app_1_0_requests); + + app_1_0_requests.clear(); app_1_0_requests.add(TestUtils.createResourceRequest(ResourceRequest.ANY, 1 * GB, 2, true, priority_1, recordFactory, "y")); app_1_0.updateResourceRequests(app_1_0_requests);