YARN-5906. Update AppSchedulingInfo to use SchedulingPlacementSet. Contributed by Wangda Tan.
This commit is contained in:
parent
972da46cb4
commit
9ca54f4810
|
@ -18,22 +18,6 @@
|
||||||
|
|
||||||
package org.apache.hadoop.yarn.server.resourcemanager.scheduler;
|
package org.apache.hadoop.yarn.server.resourcemanager.scheduler;
|
||||||
|
|
||||||
import java.util.ArrayList;
|
|
||||||
import java.util.Collection;
|
|
||||||
import java.util.HashMap;
|
|
||||||
import java.util.HashSet;
|
|
||||||
import java.util.Iterator;
|
|
||||||
import java.util.List;
|
|
||||||
import java.util.Map;
|
|
||||||
import java.util.Set;
|
|
||||||
import java.util.TreeMap;
|
|
||||||
import java.util.concurrent.ConcurrentHashMap;
|
|
||||||
import java.util.concurrent.ConcurrentSkipListMap;
|
|
||||||
import java.util.concurrent.atomic.AtomicBoolean;
|
|
||||||
import java.util.concurrent.atomic.AtomicLong;
|
|
||||||
import java.util.concurrent.locks.ReentrantReadWriteLock;
|
|
||||||
|
|
||||||
import org.apache.commons.collections.IteratorUtils;
|
|
||||||
import org.apache.commons.logging.Log;
|
import org.apache.commons.logging.Log;
|
||||||
import org.apache.commons.logging.LogFactory;
|
import org.apache.commons.logging.LogFactory;
|
||||||
import org.apache.hadoop.classification.InterfaceAudience.Private;
|
import org.apache.hadoop.classification.InterfaceAudience.Private;
|
||||||
|
@ -48,16 +32,30 @@ import org.apache.hadoop.yarn.api.records.ResourceRequest;
|
||||||
import org.apache.hadoop.yarn.exceptions.YarnException;
|
import org.apache.hadoop.yarn.exceptions.YarnException;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.RMServerUtils;
|
import org.apache.hadoop.yarn.server.resourcemanager.RMServerUtils;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager;
|
import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager;
|
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
|
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerState;
|
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerState;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.placement.PlacementSet;
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.placement.LocalitySchedulingPlacementSet;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.placement.ResourceRequestUpdateResult;
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.placement.ResourceRequestUpdateResult;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.placement.SchedulingPlacementSet;
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.placement.SchedulingPlacementSet;
|
||||||
|
|
||||||
import org.apache.hadoop.yarn.server.scheduler.SchedulerRequestKey;
|
import org.apache.hadoop.yarn.server.scheduler.SchedulerRequestKey;
|
||||||
import org.apache.hadoop.yarn.util.resource.Resources;
|
import org.apache.hadoop.yarn.util.resource.Resources;
|
||||||
|
|
||||||
|
import java.util.ArrayList;
|
||||||
|
import java.util.Collection;
|
||||||
|
import java.util.Collections;
|
||||||
|
import java.util.HashMap;
|
||||||
|
import java.util.HashSet;
|
||||||
|
import java.util.List;
|
||||||
|
import java.util.Map;
|
||||||
|
import java.util.Set;
|
||||||
|
import java.util.TreeMap;
|
||||||
|
import java.util.concurrent.ConcurrentHashMap;
|
||||||
|
import java.util.concurrent.ConcurrentSkipListMap;
|
||||||
|
import java.util.concurrent.atomic.AtomicBoolean;
|
||||||
|
import java.util.concurrent.atomic.AtomicLong;
|
||||||
|
import java.util.concurrent.locks.ReentrantReadWriteLock;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* This class keeps track of all the consumption of an application. This also
|
* This class keeps track of all the consumption of an application. This also
|
||||||
* keeps track of current running/completed containers for the application.
|
* keeps track of current running/completed containers for the application.
|
||||||
|
@ -89,8 +87,8 @@ public class AppSchedulingInfo {
|
||||||
|
|
||||||
private final ConcurrentSkipListMap<SchedulerRequestKey, Integer>
|
private final ConcurrentSkipListMap<SchedulerRequestKey, Integer>
|
||||||
schedulerKeys = new ConcurrentSkipListMap<>();
|
schedulerKeys = new ConcurrentSkipListMap<>();
|
||||||
final Map<SchedulerRequestKey, Map<String, ResourceRequest>>
|
final Map<SchedulerRequestKey, SchedulingPlacementSet<SchedulerNode>>
|
||||||
resourceRequestMap = new ConcurrentHashMap<>();
|
schedulerKeyToPlacementSets = new ConcurrentHashMap<>();
|
||||||
final Map<NodeId, Map<SchedulerRequestKey, Map<ContainerId,
|
final Map<NodeId, Map<SchedulerRequestKey, Map<ContainerId,
|
||||||
SchedContainerChangeRequest>>> containerIncreaseRequestMap =
|
SchedContainerChangeRequest>>> containerIncreaseRequestMap =
|
||||||
new ConcurrentHashMap<>();
|
new ConcurrentHashMap<>();
|
||||||
|
@ -153,7 +151,7 @@ public class AppSchedulingInfo {
|
||||||
*/
|
*/
|
||||||
private void clearRequests() {
|
private void clearRequests() {
|
||||||
schedulerKeys.clear();
|
schedulerKeys.clear();
|
||||||
resourceRequestMap.clear();
|
schedulerKeyToPlacementSets.clear();
|
||||||
LOG.info("Application " + applicationId + " requests cleared");
|
LOG.info("Application " + applicationId + " requests cleared");
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -299,7 +297,7 @@ public class AppSchedulingInfo {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
private void decrementSchedulerKeyReference(
|
public void decrementSchedulerKeyReference(
|
||||||
SchedulerRequestKey schedulerKey) {
|
SchedulerRequestKey schedulerKey) {
|
||||||
Integer schedulerKeyCount = schedulerKeys.get(schedulerKey);
|
Integer schedulerKeyCount = schedulerKeys.get(schedulerKey);
|
||||||
if (schedulerKeyCount != null) {
|
if (schedulerKeyCount != null) {
|
||||||
|
@ -391,49 +389,55 @@ public class AppSchedulingInfo {
|
||||||
*/
|
*/
|
||||||
public boolean updateResourceRequests(List<ResourceRequest> requests,
|
public boolean updateResourceRequests(List<ResourceRequest> requests,
|
||||||
boolean recoverPreemptedRequestForAContainer) {
|
boolean recoverPreemptedRequestForAContainer) {
|
||||||
|
if (null == requests || requests.isEmpty()) {
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
|
||||||
// Flag to track if any incoming requests update "ANY" requests
|
// Flag to track if any incoming requests update "ANY" requests
|
||||||
boolean anyResourcesUpdated = false;
|
boolean offswitchResourcesUpdated = false;
|
||||||
|
|
||||||
try {
|
try {
|
||||||
this.writeLock.lock();
|
this.writeLock.lock();
|
||||||
// Update resource requests
|
|
||||||
|
// A map to group resource requests and dedup
|
||||||
|
Map<SchedulerRequestKey, Map<String, ResourceRequest>> dedupRequests =
|
||||||
|
new HashMap<>();
|
||||||
|
|
||||||
|
// Group resource request by schedulerRequestKey and resourceName
|
||||||
for (ResourceRequest request : requests) {
|
for (ResourceRequest request : requests) {
|
||||||
SchedulerRequestKey schedulerKey = SchedulerRequestKey.create(request);
|
SchedulerRequestKey schedulerKey = SchedulerRequestKey.create(request);
|
||||||
String resourceName = request.getResourceName();
|
if (!dedupRequests.containsKey(schedulerKey)) {
|
||||||
|
dedupRequests.put(schedulerKey, new HashMap<>());
|
||||||
// Update node labels if required
|
}
|
||||||
updateNodeLabels(request);
|
dedupRequests.get(schedulerKey).put(request.getResourceName(), request);
|
||||||
|
|
||||||
Map<String, ResourceRequest> asks =
|
|
||||||
this.resourceRequestMap.get(schedulerKey);
|
|
||||||
if (asks == null) {
|
|
||||||
asks = new ConcurrentHashMap<>();
|
|
||||||
this.resourceRequestMap.put(schedulerKey, asks);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// Increment number of containers if recovering preempted resources
|
// Update scheduling placement set
|
||||||
ResourceRequest lastRequest = asks.get(resourceName);
|
for (Map.Entry<SchedulerRequestKey, Map<String, ResourceRequest>> entry : dedupRequests.entrySet()) {
|
||||||
if (recoverPreemptedRequestForAContainer && lastRequest != null) {
|
SchedulerRequestKey schedulerRequestKey = entry.getKey();
|
||||||
request.setNumContainers(lastRequest.getNumContainers() + 1);
|
|
||||||
|
if (!schedulerKeyToPlacementSets.containsKey(schedulerRequestKey)) {
|
||||||
|
schedulerKeyToPlacementSets.put(schedulerRequestKey,
|
||||||
|
new LocalitySchedulingPlacementSet<>(this));
|
||||||
}
|
}
|
||||||
|
|
||||||
// Update asks
|
// Update placement set
|
||||||
asks.put(resourceName, request);
|
ResourceRequestUpdateResult pendingAmountChanges =
|
||||||
|
schedulerKeyToPlacementSets.get(schedulerRequestKey)
|
||||||
|
.updateResourceRequests(
|
||||||
|
entry.getValue().values(),
|
||||||
|
recoverPreemptedRequestForAContainer);
|
||||||
|
|
||||||
if (resourceName.equals(ResourceRequest.ANY)) {
|
if (null != pendingAmountChanges) {
|
||||||
//update the applications requested labels set
|
updatePendingResources(
|
||||||
requestedPartitions.add(request.getNodeLabelExpression() == null
|
pendingAmountChanges.getLastAnyResourceRequest(),
|
||||||
? RMNodeLabelsManager.NO_LABEL :
|
pendingAmountChanges.getNewResourceRequest(), schedulerRequestKey,
|
||||||
request.getNodeLabelExpression());
|
|
||||||
|
|
||||||
anyResourcesUpdated = true;
|
|
||||||
|
|
||||||
// Update pendingResources
|
|
||||||
updatePendingResources(lastRequest, request, schedulerKey,
|
|
||||||
queue.getMetrics());
|
queue.getMetrics());
|
||||||
|
offswitchResourcesUpdated = true;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
return anyResourcesUpdated;
|
|
||||||
|
return offswitchResourcesUpdated;
|
||||||
} finally {
|
} finally {
|
||||||
this.writeLock.unlock();
|
this.writeLock.unlock();
|
||||||
}
|
}
|
||||||
|
@ -483,35 +487,13 @@ public class AppSchedulingInfo {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
private void updateNodeLabels(ResourceRequest request) {
|
public void addRequestedPartition(String partition) {
|
||||||
SchedulerRequestKey schedulerKey = SchedulerRequestKey.create(request);
|
requestedPartitions.add(partition);
|
||||||
String resourceName = request.getResourceName();
|
}
|
||||||
if (resourceName.equals(ResourceRequest.ANY)) {
|
|
||||||
ResourceRequest previousAnyRequest =
|
|
||||||
getResourceRequest(schedulerKey, resourceName);
|
|
||||||
|
|
||||||
// When there is change in ANY request label expression, we should
|
public void decPendingResource(String partition, Resource toDecrease) {
|
||||||
// update label for all resource requests already added of same
|
queue.decPendingResource(partition, toDecrease);
|
||||||
// priority as ANY resource request.
|
appResourceUsage.decPending(partition, toDecrease);
|
||||||
if ((null == previousAnyRequest)
|
|
||||||
|| hasRequestLabelChanged(previousAnyRequest, request)) {
|
|
||||||
Map<String, ResourceRequest> resourceRequest =
|
|
||||||
getResourceRequests(schedulerKey);
|
|
||||||
if (resourceRequest != null) {
|
|
||||||
for (ResourceRequest r : resourceRequest.values()) {
|
|
||||||
if (!r.getResourceName().equals(ResourceRequest.ANY)) {
|
|
||||||
r.setNodeLabelExpression(request.getNodeLabelExpression());
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
} else {
|
|
||||||
ResourceRequest anyRequest =
|
|
||||||
getResourceRequest(schedulerKey, ResourceRequest.ANY);
|
|
||||||
if (anyRequest != null) {
|
|
||||||
request.setNodeLabelExpression(anyRequest.getNodeLabelExpression());
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
private boolean hasRequestLabelChanged(ResourceRequest requestOne,
|
private boolean hasRequestLabelChanged(ResourceRequest requestOne,
|
||||||
|
@ -584,17 +566,22 @@ public class AppSchedulingInfo {
|
||||||
return schedulerKeys.keySet();
|
return schedulerKeys.keySet();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@SuppressWarnings("unchecked")
|
||||||
public Map<String, ResourceRequest> getResourceRequests(
|
public Map<String, ResourceRequest> getResourceRequests(
|
||||||
SchedulerRequestKey schedulerKey) {
|
SchedulerRequestKey schedulerKey) {
|
||||||
return resourceRequestMap.get(schedulerKey);
|
SchedulingPlacementSet ps = schedulerKeyToPlacementSets.get(schedulerKey);
|
||||||
|
if (null != ps) {
|
||||||
|
return ps.getResourceRequests();
|
||||||
|
}
|
||||||
|
return Collections.emptyMap();
|
||||||
}
|
}
|
||||||
|
|
||||||
public List<ResourceRequest> getAllResourceRequests() {
|
public List<ResourceRequest> getAllResourceRequests() {
|
||||||
List<ResourceRequest> ret = new ArrayList<>();
|
List<ResourceRequest> ret = new ArrayList<>();
|
||||||
try {
|
try {
|
||||||
this.readLock.lock();
|
this.readLock.lock();
|
||||||
for (Map<String, ResourceRequest> r : resourceRequestMap.values()) {
|
for (SchedulingPlacementSet ps : schedulerKeyToPlacementSets.values()) {
|
||||||
ret.addAll(r.values());
|
ret.addAll(ps.getResourceRequests().values());
|
||||||
}
|
}
|
||||||
} finally {
|
} finally {
|
||||||
this.readLock.unlock();
|
this.readLock.unlock();
|
||||||
|
@ -606,9 +593,9 @@ public class AppSchedulingInfo {
|
||||||
String resourceName) {
|
String resourceName) {
|
||||||
try {
|
try {
|
||||||
this.readLock.lock();
|
this.readLock.lock();
|
||||||
Map<String, ResourceRequest> nodeRequests =
|
SchedulingPlacementSet ps =
|
||||||
resourceRequestMap.get(schedulerKey);
|
schedulerKeyToPlacementSets.get(schedulerKey);
|
||||||
return (nodeRequests == null) ? null : nodeRequests.get(resourceName);
|
return (ps == null) ? null : ps.getResourceRequest(resourceName);
|
||||||
} finally {
|
} finally {
|
||||||
this.readLock.unlock();
|
this.readLock.unlock();
|
||||||
}
|
}
|
||||||
|
@ -635,10 +622,14 @@ public class AppSchedulingInfo {
|
||||||
*/
|
*/
|
||||||
@Unstable
|
@Unstable
|
||||||
public synchronized ResourceRequest getNextResourceRequest() {
|
public synchronized ResourceRequest getNextResourceRequest() {
|
||||||
for (ResourceRequest rr:
|
SchedulingPlacementSet<SchedulerNode> ps = schedulerKeyToPlacementSets.get(
|
||||||
resourceRequestMap.get(schedulerKeys.firstKey()).values()) {
|
schedulerKeys.firstKey());
|
||||||
|
if (null != ps) {
|
||||||
|
for (ResourceRequest rr : ps.getResourceRequests().values()) {
|
||||||
return rr;
|
return rr;
|
||||||
}
|
}
|
||||||
|
}
|
||||||
|
|
||||||
return null;
|
return null;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -717,141 +708,29 @@ public class AppSchedulingInfo {
|
||||||
|
|
||||||
public List<ResourceRequest> allocate(NodeType type,
|
public List<ResourceRequest> allocate(NodeType type,
|
||||||
SchedulerNode node, SchedulerRequestKey schedulerKey,
|
SchedulerNode node, SchedulerRequestKey schedulerKey,
|
||||||
|
ResourceRequest request,
|
||||||
Container containerAllocated) {
|
Container containerAllocated) {
|
||||||
try {
|
try {
|
||||||
writeLock.lock();
|
writeLock.lock();
|
||||||
ResourceRequest request;
|
|
||||||
if (type == NodeType.NODE_LOCAL) {
|
|
||||||
request = resourceRequestMap.get(schedulerKey).get(node.getNodeName());
|
|
||||||
} else if (type == NodeType.RACK_LOCAL) {
|
|
||||||
request = resourceRequestMap.get(schedulerKey).get(node.getRackName());
|
|
||||||
} else{
|
|
||||||
request = resourceRequestMap.get(schedulerKey).get(ResourceRequest.ANY);
|
|
||||||
}
|
|
||||||
return allocate(type, node, schedulerKey, request, containerAllocated);
|
|
||||||
} finally {
|
|
||||||
writeLock.unlock();
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Resources have been allocated to this application by the resource
|
|
||||||
* scheduler. Track them.
|
|
||||||
* @param type Node Type
|
|
||||||
* @param node SchedulerNode
|
|
||||||
* @param schedulerKey SchedulerRequestKey
|
|
||||||
* @param request ResourceRequest
|
|
||||||
* @param containerAllocated Container Allocated
|
|
||||||
* @return List of ResourceRequests
|
|
||||||
*/
|
|
||||||
public List<ResourceRequest> allocate(NodeType type,
|
|
||||||
SchedulerNode node, SchedulerRequestKey schedulerKey,
|
|
||||||
ResourceRequest request, Container containerAllocated) {
|
|
||||||
try {
|
|
||||||
writeLock.lock();
|
|
||||||
List<ResourceRequest> resourceRequests = new ArrayList<>();
|
|
||||||
if (type == NodeType.NODE_LOCAL) {
|
|
||||||
allocateNodeLocal(node, schedulerKey, request, resourceRequests);
|
|
||||||
} else if (type == NodeType.RACK_LOCAL) {
|
|
||||||
allocateRackLocal(node, schedulerKey, request, resourceRequests);
|
|
||||||
} else{
|
|
||||||
allocateOffSwitch(request, resourceRequests, schedulerKey);
|
|
||||||
}
|
|
||||||
|
|
||||||
if (null != containerAllocated) {
|
if (null != containerAllocated) {
|
||||||
updateMetricsForAllocatedContainer(request, type, containerAllocated);
|
updateMetricsForAllocatedContainer(type, containerAllocated);
|
||||||
}
|
}
|
||||||
return resourceRequests;
|
|
||||||
|
return schedulerKeyToPlacementSets.get(schedulerKey).allocate(type, node,
|
||||||
|
request);
|
||||||
} finally {
|
} finally {
|
||||||
writeLock.unlock();
|
writeLock.unlock();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
public List<ResourceRequest> allocate(NodeType type,
|
||||||
* The {@link ResourceScheduler} is allocating data-local resources to the
|
SchedulerNode node, SchedulerRequestKey schedulerKey,
|
||||||
* application.
|
Container containerAllocated) {
|
||||||
*/
|
return allocate(type, node, schedulerKey, null, containerAllocated);
|
||||||
private void allocateNodeLocal(SchedulerNode node,
|
|
||||||
SchedulerRequestKey schedulerKey, ResourceRequest nodeLocalRequest,
|
|
||||||
List<ResourceRequest> resourceRequests) {
|
|
||||||
// Update future requirements
|
|
||||||
decResourceRequest(node.getNodeName(), schedulerKey, nodeLocalRequest);
|
|
||||||
|
|
||||||
ResourceRequest rackLocalRequest = resourceRequestMap.get(schedulerKey).get(
|
|
||||||
node.getRackName());
|
|
||||||
decResourceRequest(node.getRackName(), schedulerKey, rackLocalRequest);
|
|
||||||
|
|
||||||
ResourceRequest offRackRequest = resourceRequestMap.get(schedulerKey).get(
|
|
||||||
ResourceRequest.ANY);
|
|
||||||
decrementOutstanding(offRackRequest, schedulerKey);
|
|
||||||
|
|
||||||
// Update cloned NodeLocal, RackLocal and OffRack requests for recovery
|
|
||||||
resourceRequests.add(cloneResourceRequest(nodeLocalRequest));
|
|
||||||
resourceRequests.add(cloneResourceRequest(rackLocalRequest));
|
|
||||||
resourceRequests.add(cloneResourceRequest(offRackRequest));
|
|
||||||
}
|
}
|
||||||
|
|
||||||
private void decResourceRequest(String resourceName,
|
public void checkForDeactivation() {
|
||||||
SchedulerRequestKey schedulerKey, ResourceRequest request) {
|
|
||||||
request.setNumContainers(request.getNumContainers() - 1);
|
|
||||||
if (request.getNumContainers() == 0) {
|
|
||||||
resourceRequestMap.get(schedulerKey).remove(resourceName);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* The {@link ResourceScheduler} is allocating data-local resources to the
|
|
||||||
* application.
|
|
||||||
*/
|
|
||||||
private void allocateRackLocal(SchedulerNode node,
|
|
||||||
SchedulerRequestKey schedulerKey, ResourceRequest rackLocalRequest,
|
|
||||||
List<ResourceRequest> resourceRequests) {
|
|
||||||
// Update future requirements
|
|
||||||
decResourceRequest(node.getRackName(), schedulerKey, rackLocalRequest);
|
|
||||||
|
|
||||||
ResourceRequest offRackRequest = resourceRequestMap.get(schedulerKey).get(
|
|
||||||
ResourceRequest.ANY);
|
|
||||||
decrementOutstanding(offRackRequest, schedulerKey);
|
|
||||||
|
|
||||||
// Update cloned RackLocal and OffRack requests for recovery
|
|
||||||
resourceRequests.add(cloneResourceRequest(rackLocalRequest));
|
|
||||||
resourceRequests.add(cloneResourceRequest(offRackRequest));
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* The {@link ResourceScheduler} is allocating data-local resources to the
|
|
||||||
* application.
|
|
||||||
*/
|
|
||||||
private void allocateOffSwitch(ResourceRequest offSwitchRequest,
|
|
||||||
List<ResourceRequest> resourceRequests,
|
|
||||||
SchedulerRequestKey schedulerKey) {
|
|
||||||
// Update future requirements
|
|
||||||
decrementOutstanding(offSwitchRequest, schedulerKey);
|
|
||||||
// Update cloned OffRack requests for recovery
|
|
||||||
resourceRequests.add(cloneResourceRequest(offSwitchRequest));
|
|
||||||
}
|
|
||||||
|
|
||||||
private void decrementOutstanding(ResourceRequest offSwitchRequest,
|
|
||||||
SchedulerRequestKey schedulerKey) {
|
|
||||||
int numOffSwitchContainers = offSwitchRequest.getNumContainers() - 1;
|
|
||||||
|
|
||||||
// Do not remove ANY
|
|
||||||
offSwitchRequest.setNumContainers(numOffSwitchContainers);
|
|
||||||
|
|
||||||
// Do we have any outstanding requests?
|
|
||||||
// If there is nothing, we need to deactivate this application
|
|
||||||
if (numOffSwitchContainers == 0) {
|
|
||||||
decrementSchedulerKeyReference(schedulerKey);
|
|
||||||
checkForDeactivation();
|
|
||||||
}
|
|
||||||
|
|
||||||
appResourceUsage.decPending(offSwitchRequest.getNodeLabelExpression(),
|
|
||||||
offSwitchRequest.getCapability());
|
|
||||||
queue.decPendingResource(offSwitchRequest.getNodeLabelExpression(),
|
|
||||||
offSwitchRequest.getCapability());
|
|
||||||
}
|
|
||||||
|
|
||||||
private void checkForDeactivation() {
|
|
||||||
if (schedulerKeys.isEmpty()) {
|
if (schedulerKeys.isEmpty()) {
|
||||||
activeUsersManager.deactivateApplication(user, applicationId);
|
activeUsersManager.deactivateApplication(user, applicationId);
|
||||||
}
|
}
|
||||||
|
@ -862,9 +741,9 @@ public class AppSchedulingInfo {
|
||||||
this.writeLock.lock();
|
this.writeLock.lock();
|
||||||
QueueMetrics oldMetrics = queue.getMetrics();
|
QueueMetrics oldMetrics = queue.getMetrics();
|
||||||
QueueMetrics newMetrics = newQueue.getMetrics();
|
QueueMetrics newMetrics = newQueue.getMetrics();
|
||||||
for (Map<String, ResourceRequest> asks : resourceRequestMap.values()) {
|
for (SchedulingPlacementSet ps : schedulerKeyToPlacementSets.values()) {
|
||||||
ResourceRequest request = asks.get(ResourceRequest.ANY);
|
ResourceRequest request = ps.getResourceRequest(ResourceRequest.ANY);
|
||||||
if (request != null) {
|
if (request != null && request.getNumContainers() > 0) {
|
||||||
oldMetrics.decrPendingResources(user, request.getNumContainers(),
|
oldMetrics.decrPendingResources(user, request.getNumContainers(),
|
||||||
request.getCapability());
|
request.getCapability());
|
||||||
newMetrics.incrPendingResources(user, request.getNumContainers(),
|
newMetrics.incrPendingResources(user, request.getNumContainers(),
|
||||||
|
@ -893,9 +772,9 @@ public class AppSchedulingInfo {
|
||||||
try {
|
try {
|
||||||
this.writeLock.lock();
|
this.writeLock.lock();
|
||||||
QueueMetrics metrics = queue.getMetrics();
|
QueueMetrics metrics = queue.getMetrics();
|
||||||
for (Map<String, ResourceRequest> asks : resourceRequestMap.values()) {
|
for (SchedulingPlacementSet ps : schedulerKeyToPlacementSets.values()) {
|
||||||
ResourceRequest request = asks.get(ResourceRequest.ANY);
|
ResourceRequest request = ps.getResourceRequest(ResourceRequest.ANY);
|
||||||
if (request != null) {
|
if (request != null && request.getNumContainers() > 0) {
|
||||||
metrics.decrPendingResources(user, request.getNumContainers(),
|
metrics.decrPendingResources(user, request.getNumContainers(),
|
||||||
request.getCapability());
|
request.getCapability());
|
||||||
|
|
||||||
|
@ -964,18 +843,6 @@ public class AppSchedulingInfo {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
public ResourceRequest cloneResourceRequest(ResourceRequest request) {
|
|
||||||
ResourceRequest newRequest = ResourceRequest.newBuilder()
|
|
||||||
.priority(request.getPriority())
|
|
||||||
.allocationRequestId(request.getAllocationRequestId())
|
|
||||||
.resourceName(request.getResourceName())
|
|
||||||
.capability(request.getCapability())
|
|
||||||
.numContainers(1)
|
|
||||||
.relaxLocality(request.getRelaxLocality())
|
|
||||||
.nodeLabelExpression(request.getNodeLabelExpression()).build();
|
|
||||||
return newRequest;
|
|
||||||
}
|
|
||||||
|
|
||||||
/*
|
/*
|
||||||
* In async environment, pending resource request could be updated during
|
* In async environment, pending resource request could be updated during
|
||||||
* scheduling, this method checks pending request before allocating
|
* scheduling, this method checks pending request before allocating
|
||||||
|
@ -984,34 +851,18 @@ public class AppSchedulingInfo {
|
||||||
SchedulerRequestKey schedulerKey) {
|
SchedulerRequestKey schedulerKey) {
|
||||||
try {
|
try {
|
||||||
readLock.lock();
|
readLock.lock();
|
||||||
ResourceRequest r = resourceRequestMap.get(schedulerKey).get(
|
SchedulingPlacementSet ps = schedulerKeyToPlacementSets.get(schedulerKey);
|
||||||
ResourceRequest.ANY);
|
if (null == ps) {
|
||||||
if (r == null || r.getNumContainers() <= 0) {
|
|
||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
if (type == NodeType.RACK_LOCAL || type == NodeType.NODE_LOCAL) {
|
return ps.canAllocate(type, node);
|
||||||
r = resourceRequestMap.get(schedulerKey).get(node.getRackName());
|
|
||||||
if (r == null || r.getNumContainers() <= 0) {
|
|
||||||
return false;
|
|
||||||
}
|
|
||||||
if (type == NodeType.NODE_LOCAL) {
|
|
||||||
r = resourceRequestMap.get(schedulerKey).get(node.getNodeName());
|
|
||||||
if (r == null || r.getNumContainers() <= 0) {
|
|
||||||
return false;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
return true;
|
|
||||||
} finally {
|
} finally {
|
||||||
readLock.unlock();
|
readLock.unlock();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
public void updateMetricsForAllocatedContainer(
|
private void updateMetricsForAllocatedContainer(
|
||||||
ResourceRequest request, NodeType type, Container containerAllocated) {
|
NodeType type, Container containerAllocated) {
|
||||||
try {
|
|
||||||
writeLock.lock();
|
|
||||||
QueueMetrics metrics = queue.getMetrics();
|
QueueMetrics metrics = queue.getMetrics();
|
||||||
if (pending) {
|
if (pending) {
|
||||||
// once an allocation is done we assume the application is
|
// once an allocation is done we assume the application is
|
||||||
|
@ -1023,68 +874,20 @@ public class AppSchedulingInfo {
|
||||||
if (LOG.isDebugEnabled()) {
|
if (LOG.isDebugEnabled()) {
|
||||||
LOG.debug("allocate: applicationId=" + applicationId + " container="
|
LOG.debug("allocate: applicationId=" + applicationId + " container="
|
||||||
+ containerAllocated.getId() + " host=" + containerAllocated
|
+ containerAllocated.getId() + " host=" + containerAllocated
|
||||||
.getNodeId().toString() + " user=" + user + " resource=" + request
|
.getNodeId().toString() + " user=" + user + " resource="
|
||||||
.getCapability() + " type=" + type);
|
+ containerAllocated.getResource() + " type="
|
||||||
|
+ type);
|
||||||
}
|
}
|
||||||
metrics.allocateResources(user, 1, request.getCapability(), true);
|
metrics.allocateResources(user, 1, containerAllocated.getResource(),
|
||||||
|
true);
|
||||||
metrics.incrNodeTypeAggregations(user, type);
|
metrics.incrNodeTypeAggregations(user, type);
|
||||||
} finally {
|
|
||||||
writeLock.unlock();
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// Get placement-set by specified schedulerKey
|
// Get placement-set by specified schedulerKey
|
||||||
// Now simply return all node of the input clusterPlacementSet
|
// Now simply return all node of the input clusterPlacementSet
|
||||||
// TODO, need update this when we support global scheduling
|
|
||||||
public <N extends SchedulerNode> SchedulingPlacementSet<N> getSchedulingPlacementSet(
|
public <N extends SchedulerNode> SchedulingPlacementSet<N> getSchedulingPlacementSet(
|
||||||
SchedulerRequestKey schedulerkey) {
|
SchedulerRequestKey schedulerkey) {
|
||||||
return new SchedulingPlacementSet<N>() {
|
return (SchedulingPlacementSet<N>) schedulerKeyToPlacementSets.get(
|
||||||
@Override
|
schedulerkey);
|
||||||
@SuppressWarnings("unchecked")
|
|
||||||
public Iterator<N> getPreferredNodeIterator(
|
|
||||||
PlacementSet<N> clusterPlacementSet) {
|
|
||||||
return IteratorUtils.singletonIterator(
|
|
||||||
clusterPlacementSet.getAllNodes().values().iterator().next());
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public ResourceRequestUpdateResult updateResourceRequests(
|
|
||||||
List<ResourceRequest> requests,
|
|
||||||
boolean recoverPreemptedRequestForAContainer) {
|
|
||||||
return null;
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public Map<String, ResourceRequest> getResourceRequests() {
|
|
||||||
return null;
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public ResourceRequest getResourceRequest(String resourceName,
|
|
||||||
SchedulerRequestKey requestKey) {
|
|
||||||
return null;
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public List<ResourceRequest> allocate(NodeType type, SchedulerNode node,
|
|
||||||
ResourceRequest request) {
|
|
||||||
return null;
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public Map<NodeId, N> getAllNodes() {
|
|
||||||
return null;
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public long getVersion() {
|
|
||||||
return 0;
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public String getPartition() {
|
|
||||||
return null;
|
|
||||||
}
|
|
||||||
};
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -0,0 +1,311 @@
|
||||||
|
/**
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one
|
||||||
|
* or more contributor license agreements. See the NOTICE file
|
||||||
|
* distributed with this work for additional information
|
||||||
|
* regarding copyright ownership. The ASF licenses this file
|
||||||
|
* to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance
|
||||||
|
* with the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
|
||||||
|
package org.apache.hadoop.yarn.server.resourcemanager.scheduler.placement;
|
||||||
|
|
||||||
|
import org.apache.commons.collections.IteratorUtils;
|
||||||
|
import org.apache.hadoop.yarn.api.records.ResourceRequest;
|
||||||
|
import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager;
|
||||||
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.AppSchedulingInfo;
|
||||||
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.NodeType;
|
||||||
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler;
|
||||||
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNode;
|
||||||
|
import org.apache.hadoop.yarn.server.scheduler.SchedulerRequestKey;
|
||||||
|
|
||||||
|
import java.util.ArrayList;
|
||||||
|
import java.util.Collection;
|
||||||
|
import java.util.Iterator;
|
||||||
|
import java.util.List;
|
||||||
|
import java.util.Map;
|
||||||
|
import java.util.concurrent.ConcurrentHashMap;
|
||||||
|
import java.util.concurrent.locks.ReentrantReadWriteLock;
|
||||||
|
|
||||||
|
public class LocalitySchedulingPlacementSet<N extends SchedulerNode>
|
||||||
|
implements SchedulingPlacementSet<N> {
|
||||||
|
private final Map<String, ResourceRequest> resourceRequestMap =
|
||||||
|
new ConcurrentHashMap<>();
|
||||||
|
private AppSchedulingInfo appSchedulingInfo;
|
||||||
|
|
||||||
|
private final ReentrantReadWriteLock.ReadLock readLock;
|
||||||
|
private final ReentrantReadWriteLock.WriteLock writeLock;
|
||||||
|
|
||||||
|
public LocalitySchedulingPlacementSet(AppSchedulingInfo info) {
|
||||||
|
ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
|
||||||
|
readLock = lock.readLock();
|
||||||
|
writeLock = lock.writeLock();
|
||||||
|
this.appSchedulingInfo = info;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
@SuppressWarnings("unchecked")
|
||||||
|
public Iterator<N> getPreferredNodeIterator(
|
||||||
|
PlacementSet<N> clusterPlacementSet) {
|
||||||
|
// Now only handle the case that single node in placementSet
|
||||||
|
// TODO, Add support to multi-hosts inside placement-set which is passed in.
|
||||||
|
|
||||||
|
N singleNode = PlacementSetUtils.getSingleNode(clusterPlacementSet);
|
||||||
|
if (null != singleNode) {
|
||||||
|
return IteratorUtils.singletonIterator(singleNode);
|
||||||
|
}
|
||||||
|
|
||||||
|
return IteratorUtils.emptyIterator();
|
||||||
|
}
|
||||||
|
|
||||||
|
private boolean hasRequestLabelChanged(ResourceRequest requestOne,
|
||||||
|
ResourceRequest requestTwo) {
|
||||||
|
String requestOneLabelExp = requestOne.getNodeLabelExpression();
|
||||||
|
String requestTwoLabelExp = requestTwo.getNodeLabelExpression();
|
||||||
|
// First request label expression can be null and second request
|
||||||
|
// is not null then we have to consider it as changed.
|
||||||
|
if ((null == requestOneLabelExp) && (null != requestTwoLabelExp)) {
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
// If the label is not matching between both request when
|
||||||
|
// requestOneLabelExp is not null.
|
||||||
|
return ((null != requestOneLabelExp) && !(requestOneLabelExp
|
||||||
|
.equals(requestTwoLabelExp)));
|
||||||
|
}
|
||||||
|
|
||||||
|
private void updateNodeLabels(ResourceRequest request) {
|
||||||
|
String resourceName = request.getResourceName();
|
||||||
|
if (resourceName.equals(ResourceRequest.ANY)) {
|
||||||
|
ResourceRequest previousAnyRequest =
|
||||||
|
getResourceRequest(resourceName);
|
||||||
|
|
||||||
|
// When there is change in ANY request label expression, we should
|
||||||
|
// update label for all resource requests already added of same
|
||||||
|
// priority as ANY resource request.
|
||||||
|
if ((null == previousAnyRequest) || hasRequestLabelChanged(
|
||||||
|
previousAnyRequest, request)) {
|
||||||
|
for (ResourceRequest r : resourceRequestMap.values()) {
|
||||||
|
if (!r.getResourceName().equals(ResourceRequest.ANY)) {
|
||||||
|
r.setNodeLabelExpression(request.getNodeLabelExpression());
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
} else{
|
||||||
|
ResourceRequest anyRequest = getResourceRequest(ResourceRequest.ANY);
|
||||||
|
if (anyRequest != null) {
|
||||||
|
request.setNodeLabelExpression(anyRequest.getNodeLabelExpression());
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public ResourceRequestUpdateResult updateResourceRequests(
|
||||||
|
Collection<ResourceRequest> requests,
|
||||||
|
boolean recoverPreemptedRequestForAContainer) {
|
||||||
|
try {
|
||||||
|
this.writeLock.lock();
|
||||||
|
|
||||||
|
ResourceRequestUpdateResult updateResult = null;
|
||||||
|
|
||||||
|
// Update resource requests
|
||||||
|
for (ResourceRequest request : requests) {
|
||||||
|
String resourceName = request.getResourceName();
|
||||||
|
|
||||||
|
// Update node labels if required
|
||||||
|
updateNodeLabels(request);
|
||||||
|
|
||||||
|
// Increment number of containers if recovering preempted resources
|
||||||
|
ResourceRequest lastRequest = resourceRequestMap.get(resourceName);
|
||||||
|
if (recoverPreemptedRequestForAContainer && lastRequest != null) {
|
||||||
|
request.setNumContainers(lastRequest.getNumContainers() + 1);
|
||||||
|
}
|
||||||
|
|
||||||
|
// Update asks
|
||||||
|
resourceRequestMap.put(resourceName, request);
|
||||||
|
|
||||||
|
if (resourceName.equals(ResourceRequest.ANY)) {
|
||||||
|
//update the applications requested labels set
|
||||||
|
appSchedulingInfo.addRequestedPartition(
|
||||||
|
request.getNodeLabelExpression() == null ?
|
||||||
|
RMNodeLabelsManager.NO_LABEL :
|
||||||
|
request.getNodeLabelExpression());
|
||||||
|
|
||||||
|
updateResult = new ResourceRequestUpdateResult(lastRequest, request);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return updateResult;
|
||||||
|
} finally {
|
||||||
|
this.writeLock.unlock();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public Map<String, ResourceRequest> getResourceRequests() {
|
||||||
|
return resourceRequestMap;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public ResourceRequest getResourceRequest(String resourceName) {
|
||||||
|
return resourceRequestMap.get(resourceName);
|
||||||
|
}
|
||||||
|
|
||||||
|
private void decrementOutstanding(ResourceRequest offSwitchRequest) {
|
||||||
|
int numOffSwitchContainers = offSwitchRequest.getNumContainers() - 1;
|
||||||
|
|
||||||
|
// Do not remove ANY
|
||||||
|
offSwitchRequest.setNumContainers(numOffSwitchContainers);
|
||||||
|
|
||||||
|
// Do we have any outstanding requests?
|
||||||
|
// If there is nothing, we need to deactivate this application
|
||||||
|
if (numOffSwitchContainers == 0) {
|
||||||
|
SchedulerRequestKey schedulerRequestKey = SchedulerRequestKey.create(
|
||||||
|
offSwitchRequest);
|
||||||
|
appSchedulingInfo.decrementSchedulerKeyReference(schedulerRequestKey);
|
||||||
|
appSchedulingInfo.checkForDeactivation();
|
||||||
|
}
|
||||||
|
|
||||||
|
appSchedulingInfo.decPendingResource(
|
||||||
|
offSwitchRequest.getNodeLabelExpression(),
|
||||||
|
offSwitchRequest.getCapability());
|
||||||
|
}
|
||||||
|
|
||||||
|
private ResourceRequest cloneResourceRequest(ResourceRequest request) {
|
||||||
|
ResourceRequest newRequest =
|
||||||
|
ResourceRequest.newInstance(request.getPriority(),
|
||||||
|
request.getResourceName(), request.getCapability(), 1,
|
||||||
|
request.getRelaxLocality(), request.getNodeLabelExpression());
|
||||||
|
return newRequest;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* The {@link ResourceScheduler} is allocating data-local resources to the
|
||||||
|
* application.
|
||||||
|
*/
|
||||||
|
private void allocateRackLocal(SchedulerNode node,
|
||||||
|
ResourceRequest rackLocalRequest,
|
||||||
|
List<ResourceRequest> resourceRequests) {
|
||||||
|
// Update future requirements
|
||||||
|
decResourceRequest(node.getRackName(), rackLocalRequest);
|
||||||
|
|
||||||
|
ResourceRequest offRackRequest = resourceRequestMap.get(
|
||||||
|
ResourceRequest.ANY);
|
||||||
|
decrementOutstanding(offRackRequest);
|
||||||
|
|
||||||
|
// Update cloned RackLocal and OffRack requests for recovery
|
||||||
|
resourceRequests.add(cloneResourceRequest(rackLocalRequest));
|
||||||
|
resourceRequests.add(cloneResourceRequest(offRackRequest));
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* The {@link ResourceScheduler} is allocating data-local resources to the
|
||||||
|
* application.
|
||||||
|
*/
|
||||||
|
private void allocateOffSwitch(ResourceRequest offSwitchRequest,
|
||||||
|
List<ResourceRequest> resourceRequests) {
|
||||||
|
// Update future requirements
|
||||||
|
decrementOutstanding(offSwitchRequest);
|
||||||
|
// Update cloned OffRack requests for recovery
|
||||||
|
resourceRequests.add(cloneResourceRequest(offSwitchRequest));
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
/**
|
||||||
|
* The {@link ResourceScheduler} is allocating data-local resources to the
|
||||||
|
* application.
|
||||||
|
*/
|
||||||
|
private void allocateNodeLocal(SchedulerNode node,
|
||||||
|
ResourceRequest nodeLocalRequest,
|
||||||
|
List<ResourceRequest> resourceRequests) {
|
||||||
|
// Update future requirements
|
||||||
|
decResourceRequest(node.getNodeName(), nodeLocalRequest);
|
||||||
|
|
||||||
|
ResourceRequest rackLocalRequest = resourceRequestMap.get(
|
||||||
|
node.getRackName());
|
||||||
|
decResourceRequest(node.getRackName(), rackLocalRequest);
|
||||||
|
|
||||||
|
ResourceRequest offRackRequest = resourceRequestMap.get(
|
||||||
|
ResourceRequest.ANY);
|
||||||
|
decrementOutstanding(offRackRequest);
|
||||||
|
|
||||||
|
// Update cloned NodeLocal, RackLocal and OffRack requests for recovery
|
||||||
|
resourceRequests.add(cloneResourceRequest(nodeLocalRequest));
|
||||||
|
resourceRequests.add(cloneResourceRequest(rackLocalRequest));
|
||||||
|
resourceRequests.add(cloneResourceRequest(offRackRequest));
|
||||||
|
}
|
||||||
|
|
||||||
|
private void decResourceRequest(String resourceName,
|
||||||
|
ResourceRequest request) {
|
||||||
|
request.setNumContainers(request.getNumContainers() - 1);
|
||||||
|
if (request.getNumContainers() == 0) {
|
||||||
|
resourceRequestMap.remove(resourceName);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public boolean canAllocate(NodeType type, SchedulerNode node) {
|
||||||
|
try {
|
||||||
|
readLock.lock();
|
||||||
|
ResourceRequest r = resourceRequestMap.get(
|
||||||
|
ResourceRequest.ANY);
|
||||||
|
if (r == null || r.getNumContainers() <= 0) {
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
if (type == NodeType.RACK_LOCAL || type == NodeType.NODE_LOCAL) {
|
||||||
|
r = resourceRequestMap.get(node.getRackName());
|
||||||
|
if (r == null || r.getNumContainers() <= 0) {
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
if (type == NodeType.NODE_LOCAL) {
|
||||||
|
r = resourceRequestMap.get(node.getNodeName());
|
||||||
|
if (r == null || r.getNumContainers() <= 0) {
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return true;
|
||||||
|
} finally {
|
||||||
|
readLock.unlock();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public List<ResourceRequest> allocate(NodeType type, SchedulerNode node,
|
||||||
|
ResourceRequest request) {
|
||||||
|
try {
|
||||||
|
writeLock.lock();
|
||||||
|
|
||||||
|
List<ResourceRequest> resourceRequests = new ArrayList<>();
|
||||||
|
|
||||||
|
if (null == request) {
|
||||||
|
if (type == NodeType.NODE_LOCAL) {
|
||||||
|
request = resourceRequestMap.get(node.getNodeName());
|
||||||
|
} else if (type == NodeType.RACK_LOCAL) {
|
||||||
|
request = resourceRequestMap.get(node.getRackName());
|
||||||
|
} else{
|
||||||
|
request = resourceRequestMap.get(ResourceRequest.ANY);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
if (type == NodeType.NODE_LOCAL) {
|
||||||
|
allocateNodeLocal(node, request, resourceRequests);
|
||||||
|
} else if (type == NodeType.RACK_LOCAL) {
|
||||||
|
allocateRackLocal(node, request, resourceRequests);
|
||||||
|
} else{
|
||||||
|
allocateOffSwitch(request, resourceRequests);
|
||||||
|
}
|
||||||
|
|
||||||
|
return resourceRequests;
|
||||||
|
} finally {
|
||||||
|
writeLock.unlock();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
|
@ -23,13 +23,14 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.NodeType;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNode;
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNode;
|
||||||
import org.apache.hadoop.yarn.server.scheduler.SchedulerRequestKey;
|
import org.apache.hadoop.yarn.server.scheduler.SchedulerRequestKey;
|
||||||
|
|
||||||
|
import java.util.Collection;
|
||||||
import java.util.Iterator;
|
import java.util.Iterator;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
import java.util.Map;
|
import java.util.Map;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* <p>
|
* <p>
|
||||||
* In addition to {@link PlacementSet}, this also maintains
|
* Comparing to {@link PlacementSet}, this also maintains
|
||||||
* pending ResourceRequests:
|
* pending ResourceRequests:
|
||||||
* - When new ResourceRequest(s) added to scheduler, or,
|
* - When new ResourceRequest(s) added to scheduler, or,
|
||||||
* - Or new container allocated, scheduler can notify corresponding
|
* - Or new container allocated, scheduler can notify corresponding
|
||||||
|
@ -42,8 +43,7 @@ import java.util.Map;
|
||||||
* can have different ways to order nodes depends on requests.
|
* can have different ways to order nodes depends on requests.
|
||||||
* </p>
|
* </p>
|
||||||
*/
|
*/
|
||||||
public interface SchedulingPlacementSet<N extends SchedulerNode>
|
public interface SchedulingPlacementSet<N extends SchedulerNode> {
|
||||||
extends PlacementSet<N> {
|
|
||||||
/**
|
/**
|
||||||
* Get iterator of preferred node depends on requirement and/or availability
|
* Get iterator of preferred node depends on requirement and/or availability
|
||||||
* @param clusterPlacementSet input cluster PlacementSet
|
* @param clusterPlacementSet input cluster PlacementSet
|
||||||
|
@ -60,7 +60,7 @@ public interface SchedulingPlacementSet<N extends SchedulerNode>
|
||||||
* @return true if total pending resource changed
|
* @return true if total pending resource changed
|
||||||
*/
|
*/
|
||||||
ResourceRequestUpdateResult updateResourceRequests(
|
ResourceRequestUpdateResult updateResourceRequests(
|
||||||
List<ResourceRequest> requests,
|
Collection<ResourceRequest> requests,
|
||||||
boolean recoverPreemptedRequestForAContainer);
|
boolean recoverPreemptedRequestForAContainer);
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -72,19 +72,25 @@ public interface SchedulingPlacementSet<N extends SchedulerNode>
|
||||||
/**
|
/**
|
||||||
* Get ResourceRequest by given schedulerKey and resourceName
|
* Get ResourceRequest by given schedulerKey and resourceName
|
||||||
* @param resourceName resourceName
|
* @param resourceName resourceName
|
||||||
* @param schedulerRequestKey schedulerRequestKey
|
|
||||||
* @return ResourceRequest
|
* @return ResourceRequest
|
||||||
*/
|
*/
|
||||||
ResourceRequest getResourceRequest(String resourceName,
|
ResourceRequest getResourceRequest(String resourceName);
|
||||||
SchedulerRequestKey schedulerRequestKey);
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Notify container allocated.
|
* Notify container allocated.
|
||||||
* @param type Type of the allocation
|
* @param type Type of the allocation
|
||||||
* @param node Which node this container allocated on
|
* @param node Which node this container allocated on
|
||||||
* @param request resource request
|
* @param request Which resource request to allocate
|
||||||
* @return list of ResourceRequests deducted
|
* @return list of ResourceRequests deducted
|
||||||
*/
|
*/
|
||||||
List<ResourceRequest> allocate(NodeType type, SchedulerNode node,
|
List<ResourceRequest> allocate(NodeType type, SchedulerNode node,
|
||||||
ResourceRequest request);
|
ResourceRequest request);
|
||||||
|
|
||||||
|
/**
|
||||||
|
* We can still have pending requirement for a given NodeType and node
|
||||||
|
* @param type Locality Type
|
||||||
|
* @param node which node we will allocate on
|
||||||
|
* @return true if we has pending requirement
|
||||||
|
*/
|
||||||
|
boolean canAllocate(NodeType type, SchedulerNode node);
|
||||||
}
|
}
|
||||||
|
|
|
@ -687,6 +687,9 @@ public class TestApplicationLimitsByPartition {
|
||||||
List<ResourceRequest> app_0_1_requests = new ArrayList<ResourceRequest>();
|
List<ResourceRequest> app_0_1_requests = new ArrayList<ResourceRequest>();
|
||||||
app_0_1_requests.add(TestUtils.createResourceRequest(ResourceRequest.ANY,
|
app_0_1_requests.add(TestUtils.createResourceRequest(ResourceRequest.ANY,
|
||||||
1 * GB, 2, true, priority_1, recordFactory));
|
1 * GB, 2, true, priority_1, recordFactory));
|
||||||
|
app_0_1.updateResourceRequests(app_0_1_requests);
|
||||||
|
|
||||||
|
app_0_1_requests.clear();
|
||||||
app_0_1_requests.add(TestUtils.createResourceRequest(ResourceRequest.ANY,
|
app_0_1_requests.add(TestUtils.createResourceRequest(ResourceRequest.ANY,
|
||||||
1 * GB, 2, true, priority_1, recordFactory, "y"));
|
1 * GB, 2, true, priority_1, recordFactory, "y"));
|
||||||
app_0_1.updateResourceRequests(app_0_1_requests);
|
app_0_1.updateResourceRequests(app_0_1_requests);
|
||||||
|
@ -715,6 +718,9 @@ public class TestApplicationLimitsByPartition {
|
||||||
List<ResourceRequest> app_1_0_requests = new ArrayList<ResourceRequest>();
|
List<ResourceRequest> app_1_0_requests = new ArrayList<ResourceRequest>();
|
||||||
app_1_0_requests.add(TestUtils.createResourceRequest(ResourceRequest.ANY,
|
app_1_0_requests.add(TestUtils.createResourceRequest(ResourceRequest.ANY,
|
||||||
1 * GB, 2, true, priority_1, recordFactory));
|
1 * GB, 2, true, priority_1, recordFactory));
|
||||||
|
app_1_0.updateResourceRequests(app_1_0_requests);
|
||||||
|
|
||||||
|
app_1_0_requests.clear();
|
||||||
app_1_0_requests.add(TestUtils.createResourceRequest(ResourceRequest.ANY,
|
app_1_0_requests.add(TestUtils.createResourceRequest(ResourceRequest.ANY,
|
||||||
1 * GB, 2, true, priority_1, recordFactory, "y"));
|
1 * GB, 2, true, priority_1, recordFactory, "y"));
|
||||||
app_1_0.updateResourceRequests(app_1_0_requests);
|
app_1_0.updateResourceRequests(app_1_0_requests);
|
||||||
|
|
Loading…
Reference in New Issue