YARN-5906. Update AppSchedulingInfo to use SchedulingPlacementSet. Contributed by Wangda Tan.

This commit is contained in:
Sunil G 2017-01-06 21:30:52 +05:30
parent 6f35700f04
commit 65e7ae5dcf
4 changed files with 444 additions and 320 deletions

View File

@ -18,22 +18,6 @@
package org.apache.hadoop.yarn.server.resourcemanager.scheduler; package org.apache.hadoop.yarn.server.resourcemanager.scheduler;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.TreeMap;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentSkipListMap;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import org.apache.commons.collections.IteratorUtils;
import org.apache.commons.logging.Log; import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory; import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.classification.InterfaceAudience.Private; import org.apache.hadoop.classification.InterfaceAudience.Private;
@ -48,14 +32,28 @@ import org.apache.hadoop.yarn.api.records.ResourceRequest;
import org.apache.hadoop.yarn.exceptions.YarnException; import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.server.resourcemanager.RMServerUtils; import org.apache.hadoop.yarn.server.resourcemanager.RMServerUtils;
import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager; import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager;
import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager;
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer; import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerState; import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerState;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.placement.PlacementSet; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.placement.LocalitySchedulingPlacementSet;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.placement.ResourceRequestUpdateResult; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.placement.ResourceRequestUpdateResult;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.placement.SchedulingPlacementSet; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.placement.SchedulingPlacementSet;
import org.apache.hadoop.yarn.util.resource.Resources; import org.apache.hadoop.yarn.util.resource.Resources;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.TreeMap;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentSkipListMap;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.locks.ReentrantReadWriteLock;
/** /**
* This class keeps track of all the consumption of an application. This also * This class keeps track of all the consumption of an application. This also
* keeps track of current running/completed containers for the application. * keeps track of current running/completed containers for the application.
@ -87,8 +85,8 @@ public class AppSchedulingInfo {
private final ConcurrentSkipListMap<SchedulerRequestKey, Integer> private final ConcurrentSkipListMap<SchedulerRequestKey, Integer>
schedulerKeys = new ConcurrentSkipListMap<>(); schedulerKeys = new ConcurrentSkipListMap<>();
final Map<SchedulerRequestKey, Map<String, ResourceRequest>> final Map<SchedulerRequestKey, SchedulingPlacementSet<SchedulerNode>>
resourceRequestMap = new ConcurrentHashMap<>(); schedulerKeyToPlacementSets = new ConcurrentHashMap<>();
final Map<NodeId, Map<SchedulerRequestKey, Map<ContainerId, final Map<NodeId, Map<SchedulerRequestKey, Map<ContainerId,
SchedContainerChangeRequest>>> containerIncreaseRequestMap = SchedContainerChangeRequest>>> containerIncreaseRequestMap =
new ConcurrentHashMap<>(); new ConcurrentHashMap<>();
@ -151,7 +149,7 @@ public class AppSchedulingInfo {
*/ */
private void clearRequests() { private void clearRequests() {
schedulerKeys.clear(); schedulerKeys.clear();
resourceRequestMap.clear(); schedulerKeyToPlacementSets.clear();
LOG.info("Application " + applicationId + " requests cleared"); LOG.info("Application " + applicationId + " requests cleared");
} }
@ -297,7 +295,7 @@ public class AppSchedulingInfo {
} }
} }
private void decrementSchedulerKeyReference( public void decrementSchedulerKeyReference(
SchedulerRequestKey schedulerKey) { SchedulerRequestKey schedulerKey) {
Integer schedulerKeyCount = schedulerKeys.get(schedulerKey); Integer schedulerKeyCount = schedulerKeys.get(schedulerKey);
if (schedulerKeyCount != null) { if (schedulerKeyCount != null) {
@ -389,49 +387,56 @@ public class AppSchedulingInfo {
*/ */
public boolean updateResourceRequests(List<ResourceRequest> requests, public boolean updateResourceRequests(List<ResourceRequest> requests,
boolean recoverPreemptedRequestForAContainer) { boolean recoverPreemptedRequestForAContainer) {
if (null == requests || requests.isEmpty()) {
return false;
}
// Flag to track if any incoming requests update "ANY" requests // Flag to track if any incoming requests update "ANY" requests
boolean anyResourcesUpdated = false; boolean offswitchResourcesUpdated = false;
try { try {
this.writeLock.lock(); this.writeLock.lock();
// Update resource requests
// A map to group resource requests and dedup
Map<SchedulerRequestKey, Map<String, ResourceRequest>> dedupRequests =
new HashMap<>();
// Group resource request by schedulerRequestKey and resourceName
for (ResourceRequest request : requests) { for (ResourceRequest request : requests) {
SchedulerRequestKey schedulerKey = SchedulerRequestKey.create(request); SchedulerRequestKey schedulerKey = SchedulerRequestKey.create(request);
String resourceName = request.getResourceName(); if (!dedupRequests.containsKey(schedulerKey)) {
dedupRequests.put(schedulerKey,
// Update node labels if required new HashMap<String, ResourceRequest>());
updateNodeLabels(request); }
dedupRequests.get(schedulerKey).put(request.getResourceName(), request);
Map<String, ResourceRequest> asks =
this.resourceRequestMap.get(schedulerKey);
if (asks == null) {
asks = new ConcurrentHashMap<>();
this.resourceRequestMap.put(schedulerKey, asks);
} }
// Increment number of containers if recovering preempted resources // Update scheduling placement set
ResourceRequest lastRequest = asks.get(resourceName); for (Map.Entry<SchedulerRequestKey, Map<String, ResourceRequest>> entry : dedupRequests.entrySet()) {
if (recoverPreemptedRequestForAContainer && lastRequest != null) { SchedulerRequestKey schedulerRequestKey = entry.getKey();
request.setNumContainers(lastRequest.getNumContainers() + 1);
if (!schedulerKeyToPlacementSets.containsKey(schedulerRequestKey)) {
schedulerKeyToPlacementSets.put(schedulerRequestKey,
new LocalitySchedulingPlacementSet<>(this));
} }
// Update asks // Update placement set
asks.put(resourceName, request); ResourceRequestUpdateResult pendingAmountChanges =
schedulerKeyToPlacementSets.get(schedulerRequestKey)
.updateResourceRequests(
entry.getValue().values(),
recoverPreemptedRequestForAContainer);
if (resourceName.equals(ResourceRequest.ANY)) { if (null != pendingAmountChanges) {
//update the applications requested labels set updatePendingResources(
requestedPartitions.add(request.getNodeLabelExpression() == null pendingAmountChanges.getLastAnyResourceRequest(),
? RMNodeLabelsManager.NO_LABEL : pendingAmountChanges.getNewResourceRequest(), schedulerRequestKey,
request.getNodeLabelExpression());
anyResourcesUpdated = true;
// Update pendingResources
updatePendingResources(lastRequest, request, schedulerKey,
queue.getMetrics()); queue.getMetrics());
offswitchResourcesUpdated = true;
} }
} }
return anyResourcesUpdated;
return offswitchResourcesUpdated;
} finally { } finally {
this.writeLock.unlock(); this.writeLock.unlock();
} }
@ -481,35 +486,13 @@ public class AppSchedulingInfo {
} }
} }
private void updateNodeLabels(ResourceRequest request) { public void addRequestedPartition(String partition) {
SchedulerRequestKey schedulerKey = SchedulerRequestKey.create(request); requestedPartitions.add(partition);
String resourceName = request.getResourceName(); }
if (resourceName.equals(ResourceRequest.ANY)) {
ResourceRequest previousAnyRequest =
getResourceRequest(schedulerKey, resourceName);
// When there is change in ANY request label expression, we should public void decPendingResource(String partition, Resource toDecrease) {
// update label for all resource requests already added of same queue.decPendingResource(partition, toDecrease);
// priority as ANY resource request. appResourceUsage.decPending(partition, toDecrease);
if ((null == previousAnyRequest)
|| hasRequestLabelChanged(previousAnyRequest, request)) {
Map<String, ResourceRequest> resourceRequest =
getResourceRequests(schedulerKey);
if (resourceRequest != null) {
for (ResourceRequest r : resourceRequest.values()) {
if (!r.getResourceName().equals(ResourceRequest.ANY)) {
r.setNodeLabelExpression(request.getNodeLabelExpression());
}
}
}
}
} else {
ResourceRequest anyRequest =
getResourceRequest(schedulerKey, ResourceRequest.ANY);
if (anyRequest != null) {
request.setNodeLabelExpression(anyRequest.getNodeLabelExpression());
}
}
} }
private boolean hasRequestLabelChanged(ResourceRequest requestOne, private boolean hasRequestLabelChanged(ResourceRequest requestOne,
@ -582,17 +565,22 @@ public class AppSchedulingInfo {
return schedulerKeys.keySet(); return schedulerKeys.keySet();
} }
@SuppressWarnings("unchecked")
public Map<String, ResourceRequest> getResourceRequests( public Map<String, ResourceRequest> getResourceRequests(
SchedulerRequestKey schedulerKey) { SchedulerRequestKey schedulerKey) {
return resourceRequestMap.get(schedulerKey); SchedulingPlacementSet ps = schedulerKeyToPlacementSets.get(schedulerKey);
if (null != ps) {
return ps.getResourceRequests();
}
return Collections.emptyMap();
} }
public List<ResourceRequest> getAllResourceRequests() { public List<ResourceRequest> getAllResourceRequests() {
List<ResourceRequest> ret = new ArrayList<>(); List<ResourceRequest> ret = new ArrayList<>();
try { try {
this.readLock.lock(); this.readLock.lock();
for (Map<String, ResourceRequest> r : resourceRequestMap.values()) { for (SchedulingPlacementSet ps : schedulerKeyToPlacementSets.values()) {
ret.addAll(r.values()); ret.addAll(ps.getResourceRequests().values());
} }
} finally { } finally {
this.readLock.unlock(); this.readLock.unlock();
@ -604,9 +592,9 @@ public class AppSchedulingInfo {
String resourceName) { String resourceName) {
try { try {
this.readLock.lock(); this.readLock.lock();
Map<String, ResourceRequest> nodeRequests = SchedulingPlacementSet ps =
resourceRequestMap.get(schedulerKey); schedulerKeyToPlacementSets.get(schedulerKey);
return (nodeRequests == null) ? null : nodeRequests.get(resourceName); return (ps == null) ? null : ps.getResourceRequest(resourceName);
} finally { } finally {
this.readLock.unlock(); this.readLock.unlock();
} }
@ -698,141 +686,29 @@ public class AppSchedulingInfo {
public List<ResourceRequest> allocate(NodeType type, public List<ResourceRequest> allocate(NodeType type,
SchedulerNode node, SchedulerRequestKey schedulerKey, SchedulerNode node, SchedulerRequestKey schedulerKey,
ResourceRequest request,
Container containerAllocated) { Container containerAllocated) {
try { try {
writeLock.lock(); writeLock.lock();
ResourceRequest request;
if (type == NodeType.NODE_LOCAL) {
request = resourceRequestMap.get(schedulerKey).get(node.getNodeName());
} else if (type == NodeType.RACK_LOCAL) {
request = resourceRequestMap.get(schedulerKey).get(node.getRackName());
} else{
request = resourceRequestMap.get(schedulerKey).get(ResourceRequest.ANY);
}
return allocate(type, node, schedulerKey, request, containerAllocated);
} finally {
writeLock.unlock();
}
}
/**
* Resources have been allocated to this application by the resource
* scheduler. Track them.
* @param type Node Type
* @param node SchedulerNode
* @param schedulerKey SchedulerRequestKey
* @param request ResourceRequest
* @param containerAllocated Container Allocated
* @return List of ResourceRequests
*/
public List<ResourceRequest> allocate(NodeType type,
SchedulerNode node, SchedulerRequestKey schedulerKey,
ResourceRequest request, Container containerAllocated) {
try {
writeLock.lock();
List<ResourceRequest> resourceRequests = new ArrayList<>();
if (type == NodeType.NODE_LOCAL) {
allocateNodeLocal(node, schedulerKey, request, resourceRequests);
} else if (type == NodeType.RACK_LOCAL) {
allocateRackLocal(node, schedulerKey, request, resourceRequests);
} else{
allocateOffSwitch(request, resourceRequests, schedulerKey);
}
if (null != containerAllocated) { if (null != containerAllocated) {
updateMetricsForAllocatedContainer(request, type, containerAllocated); updateMetricsForAllocatedContainer(type, containerAllocated);
} }
return resourceRequests;
return schedulerKeyToPlacementSets.get(schedulerKey).allocate(type, node,
request);
} finally { } finally {
writeLock.unlock(); writeLock.unlock();
} }
} }
/** public List<ResourceRequest> allocate(NodeType type,
* The {@link ResourceScheduler} is allocating data-local resources to the SchedulerNode node, SchedulerRequestKey schedulerKey,
* application. Container containerAllocated) {
*/ return allocate(type, node, schedulerKey, null, containerAllocated);
private void allocateNodeLocal(SchedulerNode node,
SchedulerRequestKey schedulerKey, ResourceRequest nodeLocalRequest,
List<ResourceRequest> resourceRequests) {
// Update future requirements
decResourceRequest(node.getNodeName(), schedulerKey, nodeLocalRequest);
ResourceRequest rackLocalRequest = resourceRequestMap.get(schedulerKey).get(
node.getRackName());
decResourceRequest(node.getRackName(), schedulerKey, rackLocalRequest);
ResourceRequest offRackRequest = resourceRequestMap.get(schedulerKey).get(
ResourceRequest.ANY);
decrementOutstanding(offRackRequest, schedulerKey);
// Update cloned NodeLocal, RackLocal and OffRack requests for recovery
resourceRequests.add(cloneResourceRequest(nodeLocalRequest));
resourceRequests.add(cloneResourceRequest(rackLocalRequest));
resourceRequests.add(cloneResourceRequest(offRackRequest));
} }
private void decResourceRequest(String resourceName, public void checkForDeactivation() {
SchedulerRequestKey schedulerKey, ResourceRequest request) {
request.setNumContainers(request.getNumContainers() - 1);
if (request.getNumContainers() == 0) {
resourceRequestMap.get(schedulerKey).remove(resourceName);
}
}
/**
* The {@link ResourceScheduler} is allocating data-local resources to the
* application.
*/
private void allocateRackLocal(SchedulerNode node,
SchedulerRequestKey schedulerKey, ResourceRequest rackLocalRequest,
List<ResourceRequest> resourceRequests) {
// Update future requirements
decResourceRequest(node.getRackName(), schedulerKey, rackLocalRequest);
ResourceRequest offRackRequest = resourceRequestMap.get(schedulerKey).get(
ResourceRequest.ANY);
decrementOutstanding(offRackRequest, schedulerKey);
// Update cloned RackLocal and OffRack requests for recovery
resourceRequests.add(cloneResourceRequest(rackLocalRequest));
resourceRequests.add(cloneResourceRequest(offRackRequest));
}
/**
* The {@link ResourceScheduler} is allocating data-local resources to the
* application.
*/
private void allocateOffSwitch(ResourceRequest offSwitchRequest,
List<ResourceRequest> resourceRequests,
SchedulerRequestKey schedulerKey) {
// Update future requirements
decrementOutstanding(offSwitchRequest, schedulerKey);
// Update cloned OffRack requests for recovery
resourceRequests.add(cloneResourceRequest(offSwitchRequest));
}
private void decrementOutstanding(ResourceRequest offSwitchRequest,
SchedulerRequestKey schedulerKey) {
int numOffSwitchContainers = offSwitchRequest.getNumContainers() - 1;
// Do not remove ANY
offSwitchRequest.setNumContainers(numOffSwitchContainers);
// Do we have any outstanding requests?
// If there is nothing, we need to deactivate this application
if (numOffSwitchContainers == 0) {
decrementSchedulerKeyReference(schedulerKey);
checkForDeactivation();
}
appResourceUsage.decPending(offSwitchRequest.getNodeLabelExpression(),
offSwitchRequest.getCapability());
queue.decPendingResource(offSwitchRequest.getNodeLabelExpression(),
offSwitchRequest.getCapability());
}
private void checkForDeactivation() {
if (schedulerKeys.isEmpty()) { if (schedulerKeys.isEmpty()) {
activeUsersManager.deactivateApplication(user, applicationId); activeUsersManager.deactivateApplication(user, applicationId);
} }
@ -843,9 +719,9 @@ public class AppSchedulingInfo {
this.writeLock.lock(); this.writeLock.lock();
QueueMetrics oldMetrics = queue.getMetrics(); QueueMetrics oldMetrics = queue.getMetrics();
QueueMetrics newMetrics = newQueue.getMetrics(); QueueMetrics newMetrics = newQueue.getMetrics();
for (Map<String, ResourceRequest> asks : resourceRequestMap.values()) { for (SchedulingPlacementSet ps : schedulerKeyToPlacementSets.values()) {
ResourceRequest request = asks.get(ResourceRequest.ANY); ResourceRequest request = ps.getResourceRequest(ResourceRequest.ANY);
if (request != null) { if (request != null && request.getNumContainers() > 0) {
oldMetrics.decrPendingResources(user, request.getNumContainers(), oldMetrics.decrPendingResources(user, request.getNumContainers(),
request.getCapability()); request.getCapability());
newMetrics.incrPendingResources(user, request.getNumContainers(), newMetrics.incrPendingResources(user, request.getNumContainers(),
@ -874,9 +750,9 @@ public class AppSchedulingInfo {
try { try {
this.writeLock.lock(); this.writeLock.lock();
QueueMetrics metrics = queue.getMetrics(); QueueMetrics metrics = queue.getMetrics();
for (Map<String, ResourceRequest> asks : resourceRequestMap.values()) { for (SchedulingPlacementSet ps : schedulerKeyToPlacementSets.values()) {
ResourceRequest request = asks.get(ResourceRequest.ANY); ResourceRequest request = ps.getResourceRequest(ResourceRequest.ANY);
if (request != null) { if (request != null && request.getNumContainers() > 0) {
metrics.decrPendingResources(user, request.getNumContainers(), metrics.decrPendingResources(user, request.getNumContainers(),
request.getCapability()); request.getCapability());
@ -945,17 +821,6 @@ public class AppSchedulingInfo {
} }
} }
public ResourceRequest cloneResourceRequest(ResourceRequest request) {
ResourceRequest newRequest = ResourceRequest.newBuilder()
.priority(request.getPriority())
.resourceName(request.getResourceName())
.capability(request.getCapability())
.numContainers(1)
.relaxLocality(request.getRelaxLocality())
.nodeLabelExpression(request.getNodeLabelExpression()).build();
return newRequest;
}
/* /*
* In async environment, pending resource request could be updated during * In async environment, pending resource request could be updated during
* scheduling, this method checks pending request before allocating * scheduling, this method checks pending request before allocating
@ -964,34 +829,18 @@ public class AppSchedulingInfo {
SchedulerRequestKey schedulerKey) { SchedulerRequestKey schedulerKey) {
try { try {
readLock.lock(); readLock.lock();
ResourceRequest r = resourceRequestMap.get(schedulerKey).get( SchedulingPlacementSet ps = schedulerKeyToPlacementSets.get(schedulerKey);
ResourceRequest.ANY); if (null == ps) {
if (r == null || r.getNumContainers() <= 0) {
return false; return false;
} }
if (type == NodeType.RACK_LOCAL || type == NodeType.NODE_LOCAL) { return ps.canAllocate(type, node);
r = resourceRequestMap.get(schedulerKey).get(node.getRackName());
if (r == null || r.getNumContainers() <= 0) {
return false;
}
if (type == NodeType.NODE_LOCAL) {
r = resourceRequestMap.get(schedulerKey).get(node.getNodeName());
if (r == null || r.getNumContainers() <= 0) {
return false;
}
}
}
return true;
} finally { } finally {
readLock.unlock(); readLock.unlock();
} }
} }
public void updateMetricsForAllocatedContainer( private void updateMetricsForAllocatedContainer(
ResourceRequest request, NodeType type, Container containerAllocated) { NodeType type, Container containerAllocated) {
try {
writeLock.lock();
QueueMetrics metrics = queue.getMetrics(); QueueMetrics metrics = queue.getMetrics();
if (pending) { if (pending) {
// once an allocation is done we assume the application is // once an allocation is done we assume the application is
@ -1003,68 +852,20 @@ public class AppSchedulingInfo {
if (LOG.isDebugEnabled()) { if (LOG.isDebugEnabled()) {
LOG.debug("allocate: applicationId=" + applicationId + " container=" LOG.debug("allocate: applicationId=" + applicationId + " container="
+ containerAllocated.getId() + " host=" + containerAllocated + containerAllocated.getId() + " host=" + containerAllocated
.getNodeId().toString() + " user=" + user + " resource=" + request .getNodeId().toString() + " user=" + user + " resource="
.getCapability() + " type=" + type); + containerAllocated.getResource() + " type="
+ type);
} }
metrics.allocateResources(user, 1, request.getCapability(), true); metrics.allocateResources(user, 1, containerAllocated.getResource(),
true);
metrics.incrNodeTypeAggregations(user, type); metrics.incrNodeTypeAggregations(user, type);
} finally {
writeLock.unlock();
}
} }
// Get placement-set by specified schedulerKey // Get placement-set by specified schedulerKey
// Now simply return all node of the input clusterPlacementSet // Now simply return all node of the input clusterPlacementSet
// TODO, need update this when we support global scheduling
public <N extends SchedulerNode> SchedulingPlacementSet<N> getSchedulingPlacementSet( public <N extends SchedulerNode> SchedulingPlacementSet<N> getSchedulingPlacementSet(
SchedulerRequestKey schedulerkey) { SchedulerRequestKey schedulerkey) {
return new SchedulingPlacementSet<N>() { return (SchedulingPlacementSet<N>) schedulerKeyToPlacementSets.get(
@Override schedulerkey);
@SuppressWarnings("unchecked")
public Iterator<N> getPreferredNodeIterator(
PlacementSet<N> clusterPlacementSet) {
return IteratorUtils.singletonIterator(
clusterPlacementSet.getAllNodes().values().iterator().next());
}
@Override
public ResourceRequestUpdateResult updateResourceRequests(
List<ResourceRequest> requests,
boolean recoverPreemptedRequestForAContainer) {
return null;
}
@Override
public Map<String, ResourceRequest> getResourceRequests() {
return null;
}
@Override
public ResourceRequest getResourceRequest(String resourceName,
SchedulerRequestKey requestKey) {
return null;
}
@Override
public List<ResourceRequest> allocate(NodeType type, SchedulerNode node,
ResourceRequest request) {
return null;
}
@Override
public Map<NodeId, N> getAllNodes() {
return null;
}
@Override
public long getVersion() {
return 0;
}
@Override
public String getPartition() {
return null;
}
};
} }
} }

View File

@ -0,0 +1,311 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.server.resourcemanager.scheduler.placement;
import org.apache.commons.collections.IteratorUtils;
import org.apache.hadoop.yarn.api.records.ResourceRequest;
import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.AppSchedulingInfo;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.NodeType;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNode;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerRequestKey;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.locks.ReentrantReadWriteLock;
public class LocalitySchedulingPlacementSet<N extends SchedulerNode>
implements SchedulingPlacementSet<N> {
private final Map<String, ResourceRequest> resourceRequestMap =
new ConcurrentHashMap<>();
private AppSchedulingInfo appSchedulingInfo;
private final ReentrantReadWriteLock.ReadLock readLock;
private final ReentrantReadWriteLock.WriteLock writeLock;
public LocalitySchedulingPlacementSet(AppSchedulingInfo info) {
ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
readLock = lock.readLock();
writeLock = lock.writeLock();
this.appSchedulingInfo = info;
}
@Override
@SuppressWarnings("unchecked")
public Iterator<N> getPreferredNodeIterator(
PlacementSet<N> clusterPlacementSet) {
// Now only handle the case that single node in placementSet
// TODO, Add support to multi-hosts inside placement-set which is passed in.
N singleNode = PlacementSetUtils.getSingleNode(clusterPlacementSet);
if (null != singleNode) {
return IteratorUtils.singletonIterator(singleNode);
}
return IteratorUtils.emptyIterator();
}
private boolean hasRequestLabelChanged(ResourceRequest requestOne,
ResourceRequest requestTwo) {
String requestOneLabelExp = requestOne.getNodeLabelExpression();
String requestTwoLabelExp = requestTwo.getNodeLabelExpression();
// First request label expression can be null and second request
// is not null then we have to consider it as changed.
if ((null == requestOneLabelExp) && (null != requestTwoLabelExp)) {
return true;
}
// If the label is not matching between both request when
// requestOneLabelExp is not null.
return ((null != requestOneLabelExp) && !(requestOneLabelExp
.equals(requestTwoLabelExp)));
}
private void updateNodeLabels(ResourceRequest request) {
String resourceName = request.getResourceName();
if (resourceName.equals(ResourceRequest.ANY)) {
ResourceRequest previousAnyRequest =
getResourceRequest(resourceName);
// When there is change in ANY request label expression, we should
// update label for all resource requests already added of same
// priority as ANY resource request.
if ((null == previousAnyRequest) || hasRequestLabelChanged(
previousAnyRequest, request)) {
for (ResourceRequest r : resourceRequestMap.values()) {
if (!r.getResourceName().equals(ResourceRequest.ANY)) {
r.setNodeLabelExpression(request.getNodeLabelExpression());
}
}
}
} else{
ResourceRequest anyRequest = getResourceRequest(ResourceRequest.ANY);
if (anyRequest != null) {
request.setNodeLabelExpression(anyRequest.getNodeLabelExpression());
}
}
}
@Override
public ResourceRequestUpdateResult updateResourceRequests(
Collection<ResourceRequest> requests,
boolean recoverPreemptedRequestForAContainer) {
try {
this.writeLock.lock();
ResourceRequestUpdateResult updateResult = null;
// Update resource requests
for (ResourceRequest request : requests) {
String resourceName = request.getResourceName();
// Update node labels if required
updateNodeLabels(request);
// Increment number of containers if recovering preempted resources
ResourceRequest lastRequest = resourceRequestMap.get(resourceName);
if (recoverPreemptedRequestForAContainer && lastRequest != null) {
request.setNumContainers(lastRequest.getNumContainers() + 1);
}
// Update asks
resourceRequestMap.put(resourceName, request);
if (resourceName.equals(ResourceRequest.ANY)) {
//update the applications requested labels set
appSchedulingInfo.addRequestedPartition(
request.getNodeLabelExpression() == null ?
RMNodeLabelsManager.NO_LABEL :
request.getNodeLabelExpression());
updateResult = new ResourceRequestUpdateResult(lastRequest, request);
}
}
return updateResult;
} finally {
this.writeLock.unlock();
}
}
@Override
public Map<String, ResourceRequest> getResourceRequests() {
return resourceRequestMap;
}
@Override
public ResourceRequest getResourceRequest(String resourceName) {
return resourceRequestMap.get(resourceName);
}
private void decrementOutstanding(ResourceRequest offSwitchRequest) {
int numOffSwitchContainers = offSwitchRequest.getNumContainers() - 1;
// Do not remove ANY
offSwitchRequest.setNumContainers(numOffSwitchContainers);
// Do we have any outstanding requests?
// If there is nothing, we need to deactivate this application
if (numOffSwitchContainers == 0) {
SchedulerRequestKey schedulerRequestKey = SchedulerRequestKey.create(
offSwitchRequest);
appSchedulingInfo.decrementSchedulerKeyReference(schedulerRequestKey);
appSchedulingInfo.checkForDeactivation();
}
appSchedulingInfo.decPendingResource(
offSwitchRequest.getNodeLabelExpression(),
offSwitchRequest.getCapability());
}
private ResourceRequest cloneResourceRequest(ResourceRequest request) {
ResourceRequest newRequest =
ResourceRequest.newInstance(request.getPriority(),
request.getResourceName(), request.getCapability(), 1,
request.getRelaxLocality(), request.getNodeLabelExpression());
return newRequest;
}
/**
* The {@link ResourceScheduler} is allocating data-local resources to the
* application.
*/
private void allocateRackLocal(SchedulerNode node,
ResourceRequest rackLocalRequest,
List<ResourceRequest> resourceRequests) {
// Update future requirements
decResourceRequest(node.getRackName(), rackLocalRequest);
ResourceRequest offRackRequest = resourceRequestMap.get(
ResourceRequest.ANY);
decrementOutstanding(offRackRequest);
// Update cloned RackLocal and OffRack requests for recovery
resourceRequests.add(cloneResourceRequest(rackLocalRequest));
resourceRequests.add(cloneResourceRequest(offRackRequest));
}
/**
* The {@link ResourceScheduler} is allocating data-local resources to the
* application.
*/
private void allocateOffSwitch(ResourceRequest offSwitchRequest,
List<ResourceRequest> resourceRequests) {
// Update future requirements
decrementOutstanding(offSwitchRequest);
// Update cloned OffRack requests for recovery
resourceRequests.add(cloneResourceRequest(offSwitchRequest));
}
/**
* The {@link ResourceScheduler} is allocating data-local resources to the
* application.
*/
private void allocateNodeLocal(SchedulerNode node,
ResourceRequest nodeLocalRequest,
List<ResourceRequest> resourceRequests) {
// Update future requirements
decResourceRequest(node.getNodeName(), nodeLocalRequest);
ResourceRequest rackLocalRequest = resourceRequestMap.get(
node.getRackName());
decResourceRequest(node.getRackName(), rackLocalRequest);
ResourceRequest offRackRequest = resourceRequestMap.get(
ResourceRequest.ANY);
decrementOutstanding(offRackRequest);
// Update cloned NodeLocal, RackLocal and OffRack requests for recovery
resourceRequests.add(cloneResourceRequest(nodeLocalRequest));
resourceRequests.add(cloneResourceRequest(rackLocalRequest));
resourceRequests.add(cloneResourceRequest(offRackRequest));
}
private void decResourceRequest(String resourceName,
ResourceRequest request) {
request.setNumContainers(request.getNumContainers() - 1);
if (request.getNumContainers() == 0) {
resourceRequestMap.remove(resourceName);
}
}
@Override
public boolean canAllocate(NodeType type, SchedulerNode node) {
try {
readLock.lock();
ResourceRequest r = resourceRequestMap.get(
ResourceRequest.ANY);
if (r == null || r.getNumContainers() <= 0) {
return false;
}
if (type == NodeType.RACK_LOCAL || type == NodeType.NODE_LOCAL) {
r = resourceRequestMap.get(node.getRackName());
if (r == null || r.getNumContainers() <= 0) {
return false;
}
if (type == NodeType.NODE_LOCAL) {
r = resourceRequestMap.get(node.getNodeName());
if (r == null || r.getNumContainers() <= 0) {
return false;
}
}
}
return true;
} finally {
readLock.unlock();
}
}
@Override
public List<ResourceRequest> allocate(NodeType type, SchedulerNode node,
ResourceRequest request) {
try {
writeLock.lock();
List<ResourceRequest> resourceRequests = new ArrayList<>();
if (null == request) {
if (type == NodeType.NODE_LOCAL) {
request = resourceRequestMap.get(node.getNodeName());
} else if (type == NodeType.RACK_LOCAL) {
request = resourceRequestMap.get(node.getRackName());
} else{
request = resourceRequestMap.get(ResourceRequest.ANY);
}
}
if (type == NodeType.NODE_LOCAL) {
allocateNodeLocal(node, request, resourceRequests);
} else if (type == NodeType.RACK_LOCAL) {
allocateRackLocal(node, request, resourceRequests);
} else{
allocateOffSwitch(request, resourceRequests);
}
return resourceRequests;
} finally {
writeLock.unlock();
}
}
}

View File

@ -23,13 +23,14 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.NodeType;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNode; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNode;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerRequestKey; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerRequestKey;
import java.util.Collection;
import java.util.Iterator; import java.util.Iterator;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
/** /**
* <p> * <p>
* In addition to {@link PlacementSet}, this also maintains * Comparing to {@link PlacementSet}, this also maintains
* pending ResourceRequests: * pending ResourceRequests:
* - When new ResourceRequest(s) added to scheduler, or, * - When new ResourceRequest(s) added to scheduler, or,
* - Or new container allocated, scheduler can notify corresponding * - Or new container allocated, scheduler can notify corresponding
@ -42,8 +43,7 @@ import java.util.Map;
* can have different ways to order nodes depends on requests. * can have different ways to order nodes depends on requests.
* </p> * </p>
*/ */
public interface SchedulingPlacementSet<N extends SchedulerNode> public interface SchedulingPlacementSet<N extends SchedulerNode> {
extends PlacementSet<N> {
/** /**
* Get iterator of preferred node depends on requirement and/or availability * Get iterator of preferred node depends on requirement and/or availability
* @param clusterPlacementSet input cluster PlacementSet * @param clusterPlacementSet input cluster PlacementSet
@ -60,7 +60,7 @@ public interface SchedulingPlacementSet<N extends SchedulerNode>
* @return true if total pending resource changed * @return true if total pending resource changed
*/ */
ResourceRequestUpdateResult updateResourceRequests( ResourceRequestUpdateResult updateResourceRequests(
List<ResourceRequest> requests, Collection<ResourceRequest> requests,
boolean recoverPreemptedRequestForAContainer); boolean recoverPreemptedRequestForAContainer);
/** /**
@ -72,19 +72,25 @@ public interface SchedulingPlacementSet<N extends SchedulerNode>
/** /**
* Get ResourceRequest by given schedulerKey and resourceName * Get ResourceRequest by given schedulerKey and resourceName
* @param resourceName resourceName * @param resourceName resourceName
* @param schedulerRequestKey schedulerRequestKey
* @return ResourceRequest * @return ResourceRequest
*/ */
ResourceRequest getResourceRequest(String resourceName, ResourceRequest getResourceRequest(String resourceName);
SchedulerRequestKey schedulerRequestKey);
/** /**
* Notify container allocated. * Notify container allocated.
* @param type Type of the allocation * @param type Type of the allocation
* @param node Which node this container allocated on * @param node Which node this container allocated on
* @param request resource request * @param request Which resource request to allocate
* @return list of ResourceRequests deducted * @return list of ResourceRequests deducted
*/ */
List<ResourceRequest> allocate(NodeType type, SchedulerNode node, List<ResourceRequest> allocate(NodeType type, SchedulerNode node,
ResourceRequest request); ResourceRequest request);
/**
* We can still have pending requirement for a given NodeType and node
* @param type Locality Type
* @param node which node we will allocate on
* @return true if we has pending requirement
*/
boolean canAllocate(NodeType type, SchedulerNode node);
} }

View File

@ -687,6 +687,9 @@ public class TestApplicationLimitsByPartition {
List<ResourceRequest> app_0_1_requests = new ArrayList<ResourceRequest>(); List<ResourceRequest> app_0_1_requests = new ArrayList<ResourceRequest>();
app_0_1_requests.add(TestUtils.createResourceRequest(ResourceRequest.ANY, app_0_1_requests.add(TestUtils.createResourceRequest(ResourceRequest.ANY,
1 * GB, 2, true, priority_1, recordFactory)); 1 * GB, 2, true, priority_1, recordFactory));
app_0_1.updateResourceRequests(app_0_1_requests);
app_0_1_requests.clear();
app_0_1_requests.add(TestUtils.createResourceRequest(ResourceRequest.ANY, app_0_1_requests.add(TestUtils.createResourceRequest(ResourceRequest.ANY,
1 * GB, 2, true, priority_1, recordFactory, "y")); 1 * GB, 2, true, priority_1, recordFactory, "y"));
app_0_1.updateResourceRequests(app_0_1_requests); app_0_1.updateResourceRequests(app_0_1_requests);
@ -715,6 +718,9 @@ public class TestApplicationLimitsByPartition {
List<ResourceRequest> app_1_0_requests = new ArrayList<ResourceRequest>(); List<ResourceRequest> app_1_0_requests = new ArrayList<ResourceRequest>();
app_1_0_requests.add(TestUtils.createResourceRequest(ResourceRequest.ANY, app_1_0_requests.add(TestUtils.createResourceRequest(ResourceRequest.ANY,
1 * GB, 2, true, priority_1, recordFactory)); 1 * GB, 2, true, priority_1, recordFactory));
app_1_0.updateResourceRequests(app_1_0_requests);
app_1_0_requests.clear();
app_1_0_requests.add(TestUtils.createResourceRequest(ResourceRequest.ANY, app_1_0_requests.add(TestUtils.createResourceRequest(ResourceRequest.ANY,
1 * GB, 2, true, priority_1, recordFactory, "y")); 1 * GB, 2, true, priority_1, recordFactory, "y"));
app_1_0.updateResourceRequests(app_1_0_requests); app_1_0.updateResourceRequests(app_1_0_requests);