YARN-4524. Cleanup AppSchedulingInfo. (Karthik Kambatla via wangda)
This commit is contained in:
parent
6eefae1b33
commit
05fa852d75
|
@ -26,6 +26,8 @@ Release 2.9.0 - UNRELEASED
|
||||||
|
|
||||||
YARN-4522. Queue acl can be checked at app submission. (Jian He via wangda)
|
YARN-4522. Queue acl can be checked at app submission. (Jian He via wangda)
|
||||||
|
|
||||||
|
YARN-4524. Cleanup AppSchedulingInfo. (Karthik Kambatla via wangda)
|
||||||
|
|
||||||
OPTIMIZATIONS
|
OPTIMIZATIONS
|
||||||
|
|
||||||
BUG FIXES
|
BUG FIXES
|
||||||
|
|
|
@ -20,6 +20,7 @@ package org.apache.hadoop.yarn.server.resourcemanager.scheduler;
|
||||||
|
|
||||||
import java.util.ArrayList;
|
import java.util.ArrayList;
|
||||||
import java.util.Collection;
|
import java.util.Collection;
|
||||||
|
import java.util.Comparator;
|
||||||
import java.util.HashMap;
|
import java.util.HashMap;
|
||||||
import java.util.HashSet;
|
import java.util.HashSet;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
|
@ -42,7 +43,6 @@ import org.apache.hadoop.yarn.api.records.NodeId;
|
||||||
import org.apache.hadoop.yarn.api.records.Priority;
|
import org.apache.hadoop.yarn.api.records.Priority;
|
||||||
import org.apache.hadoop.yarn.api.records.Resource;
|
import org.apache.hadoop.yarn.api.records.Resource;
|
||||||
import org.apache.hadoop.yarn.api.records.ResourceRequest;
|
import org.apache.hadoop.yarn.api.records.ResourceRequest;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptState;
|
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
|
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerState;
|
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerState;
|
||||||
import org.apache.hadoop.yarn.util.resource.Resources;
|
import org.apache.hadoop.yarn.util.resource.Resources;
|
||||||
|
@ -56,40 +56,36 @@ import org.apache.hadoop.yarn.util.resource.Resources;
|
||||||
public class AppSchedulingInfo {
|
public class AppSchedulingInfo {
|
||||||
|
|
||||||
private static final Log LOG = LogFactory.getLog(AppSchedulingInfo.class);
|
private static final Log LOG = LogFactory.getLog(AppSchedulingInfo.class);
|
||||||
|
private static final Comparator COMPARATOR =
|
||||||
|
new org.apache.hadoop.yarn.server.resourcemanager.resource.Priority.Comparator();
|
||||||
|
private static final int EPOCH_BIT_SHIFT = 40;
|
||||||
|
|
||||||
|
private final ApplicationId applicationId;
|
||||||
private final ApplicationAttemptId applicationAttemptId;
|
private final ApplicationAttemptId applicationAttemptId;
|
||||||
final ApplicationId applicationId;
|
|
||||||
private String queueName;
|
|
||||||
Queue queue;
|
|
||||||
final String user;
|
|
||||||
// TODO making containerIdCounter long
|
|
||||||
private final AtomicLong containerIdCounter;
|
private final AtomicLong containerIdCounter;
|
||||||
private final int EPOCH_BIT_SHIFT = 40;
|
private final String user;
|
||||||
|
|
||||||
final Set<Priority> priorities = new TreeSet<Priority>(
|
private Queue queue;
|
||||||
new org.apache.hadoop.yarn.server.resourcemanager.resource.Priority.Comparator());
|
|
||||||
final Map<Priority, Map<String, ResourceRequest>> resourceRequestMap =
|
|
||||||
new ConcurrentHashMap<Priority, Map<String, ResourceRequest>>();
|
|
||||||
final Map<NodeId, Map<Priority, Map<ContainerId,
|
|
||||||
SchedContainerChangeRequest>>> increaseRequestMap =
|
|
||||||
new ConcurrentHashMap<>();
|
|
||||||
private Set<String> userBlacklist = new HashSet<>();
|
|
||||||
private Set<String> amBlacklist = new HashSet<>();
|
|
||||||
|
|
||||||
//private final ApplicationStore store;
|
|
||||||
private ActiveUsersManager activeUsersManager;
|
private ActiveUsersManager activeUsersManager;
|
||||||
|
private boolean pending = true; // whether accepted/allocated by scheduler
|
||||||
/* Allocated by scheduler */
|
|
||||||
boolean pending = true; // for app metrics
|
|
||||||
|
|
||||||
private ResourceUsage appResourceUsage;
|
private ResourceUsage appResourceUsage;
|
||||||
|
|
||||||
|
private final Set<String> amBlacklist = new HashSet<>();
|
||||||
|
private Set<String> userBlacklist = new HashSet<>();
|
||||||
|
|
||||||
|
final Set<Priority> priorities = new TreeSet<>(COMPARATOR);
|
||||||
|
final Map<Priority, Map<String, ResourceRequest>> resourceRequestMap =
|
||||||
|
new ConcurrentHashMap<>();
|
||||||
|
final Map<NodeId, Map<Priority, Map<ContainerId,
|
||||||
|
SchedContainerChangeRequest>>> containerIncreaseRequestMap =
|
||||||
|
new ConcurrentHashMap<>();
|
||||||
|
|
||||||
public AppSchedulingInfo(ApplicationAttemptId appAttemptId,
|
public AppSchedulingInfo(ApplicationAttemptId appAttemptId,
|
||||||
String user, Queue queue, ActiveUsersManager activeUsersManager,
|
String user, Queue queue, ActiveUsersManager activeUsersManager,
|
||||||
long epoch, ResourceUsage appResourceUsage) {
|
long epoch, ResourceUsage appResourceUsage) {
|
||||||
this.applicationAttemptId = appAttemptId;
|
this.applicationAttemptId = appAttemptId;
|
||||||
this.applicationId = appAttemptId.getApplicationId();
|
this.applicationId = appAttemptId.getApplicationId();
|
||||||
this.queue = queue;
|
this.queue = queue;
|
||||||
this.queueName = queue.getQueueName();
|
|
||||||
this.user = user;
|
this.user = user;
|
||||||
this.activeUsersManager = activeUsersManager;
|
this.activeUsersManager = activeUsersManager;
|
||||||
this.containerIdCounter = new AtomicLong(epoch << EPOCH_BIT_SHIFT);
|
this.containerIdCounter = new AtomicLong(epoch << EPOCH_BIT_SHIFT);
|
||||||
|
@ -104,14 +100,18 @@ public class AppSchedulingInfo {
|
||||||
return applicationAttemptId;
|
return applicationAttemptId;
|
||||||
}
|
}
|
||||||
|
|
||||||
public String getQueueName() {
|
|
||||||
return queueName;
|
|
||||||
}
|
|
||||||
|
|
||||||
public String getUser() {
|
public String getUser() {
|
||||||
return user;
|
return user;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public long getNewContainerId() {
|
||||||
|
return this.containerIdCounter.incrementAndGet();
|
||||||
|
}
|
||||||
|
|
||||||
|
public synchronized String getQueueName() {
|
||||||
|
return queue.getQueueName();
|
||||||
|
}
|
||||||
|
|
||||||
public synchronized boolean isPending() {
|
public synchronized boolean isPending() {
|
||||||
return pending;
|
return pending;
|
||||||
}
|
}
|
||||||
|
@ -125,30 +125,23 @@ public class AppSchedulingInfo {
|
||||||
LOG.info("Application " + applicationId + " requests cleared");
|
LOG.info("Application " + applicationId + " requests cleared");
|
||||||
}
|
}
|
||||||
|
|
||||||
public long getNewContainerId() {
|
public synchronized boolean hasIncreaseRequest(NodeId nodeId) {
|
||||||
return this.containerIdCounter.incrementAndGet();
|
|
||||||
}
|
|
||||||
|
|
||||||
public boolean hasIncreaseRequest(NodeId nodeId) {
|
|
||||||
Map<Priority, Map<ContainerId, SchedContainerChangeRequest>> requestsOnNode =
|
Map<Priority, Map<ContainerId, SchedContainerChangeRequest>> requestsOnNode =
|
||||||
increaseRequestMap.get(nodeId);
|
containerIncreaseRequestMap.get(nodeId);
|
||||||
if (null == requestsOnNode) {
|
return requestsOnNode == null ? false : requestsOnNode.size() > 0;
|
||||||
return false;
|
|
||||||
}
|
|
||||||
return requestsOnNode.size() > 0;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
public Map<ContainerId, SchedContainerChangeRequest>
|
public synchronized Map<ContainerId, SchedContainerChangeRequest>
|
||||||
getIncreaseRequests(NodeId nodeId, Priority priority) {
|
getIncreaseRequests(NodeId nodeId, Priority priority) {
|
||||||
Map<Priority, Map<ContainerId, SchedContainerChangeRequest>> requestsOnNode =
|
Map<Priority, Map<ContainerId, SchedContainerChangeRequest>> requestsOnNode =
|
||||||
increaseRequestMap.get(nodeId);
|
containerIncreaseRequestMap.get(nodeId);
|
||||||
if (null == requestsOnNode) {
|
return requestsOnNode == null ? null : requestsOnNode.get(priority);
|
||||||
return null;
|
|
||||||
}
|
|
||||||
|
|
||||||
return requestsOnNode.get(priority);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* return true if any of the existing increase requests are updated,
|
||||||
|
* false if none of them are updated
|
||||||
|
*/
|
||||||
public synchronized boolean updateIncreaseRequests(
|
public synchronized boolean updateIncreaseRequests(
|
||||||
List<SchedContainerChangeRequest> increaseRequests) {
|
List<SchedContainerChangeRequest> increaseRequests) {
|
||||||
boolean resourceUpdated = false;
|
boolean resourceUpdated = false;
|
||||||
|
@ -157,10 +150,10 @@ public class AppSchedulingInfo {
|
||||||
NodeId nodeId = r.getRMContainer().getAllocatedNode();
|
NodeId nodeId = r.getRMContainer().getAllocatedNode();
|
||||||
|
|
||||||
Map<Priority, Map<ContainerId, SchedContainerChangeRequest>> requestsOnNode =
|
Map<Priority, Map<ContainerId, SchedContainerChangeRequest>> requestsOnNode =
|
||||||
increaseRequestMap.get(nodeId);
|
containerIncreaseRequestMap.get(nodeId);
|
||||||
if (null == requestsOnNode) {
|
if (null == requestsOnNode) {
|
||||||
requestsOnNode = new TreeMap<>();
|
requestsOnNode = new TreeMap<>();
|
||||||
increaseRequestMap.put(nodeId, requestsOnNode);
|
containerIncreaseRequestMap.put(nodeId, requestsOnNode);
|
||||||
}
|
}
|
||||||
|
|
||||||
SchedContainerChangeRequest prevChangeRequest =
|
SchedContainerChangeRequest prevChangeRequest =
|
||||||
|
@ -168,22 +161,21 @@ public class AppSchedulingInfo {
|
||||||
if (null != prevChangeRequest) {
|
if (null != prevChangeRequest) {
|
||||||
if (Resources.equals(prevChangeRequest.getTargetCapacity(),
|
if (Resources.equals(prevChangeRequest.getTargetCapacity(),
|
||||||
r.getTargetCapacity())) {
|
r.getTargetCapacity())) {
|
||||||
// New target capacity is as same as what we have, just ignore the new
|
// increase request hasn't changed
|
||||||
// one
|
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
|
|
||||||
// remove the old one
|
// remove the old one, as we will use the new one going forward
|
||||||
removeIncreaseRequest(nodeId, prevChangeRequest.getPriority(),
|
removeIncreaseRequest(nodeId, prevChangeRequest.getPriority(),
|
||||||
prevChangeRequest.getContainerId());
|
prevChangeRequest.getContainerId());
|
||||||
}
|
}
|
||||||
|
|
||||||
if (Resources.equals(r.getTargetCapacity(), r.getRMContainer().getAllocatedResource())) {
|
if (Resources.equals(r.getTargetCapacity(),
|
||||||
|
r.getRMContainer().getAllocatedResource())) {
|
||||||
if (LOG.isDebugEnabled()) {
|
if (LOG.isDebugEnabled()) {
|
||||||
LOG.debug("Trying to increase/decrease container, "
|
LOG.debug("Trying to increase container " + r.getContainerId()
|
||||||
+ "target capacity = previous capacity = " + prevChangeRequest
|
+ ", target capacity = previous capacity = " + prevChangeRequest
|
||||||
+ " for container=" + r.getContainerId()
|
+ ". Will ignore this increase request.");
|
||||||
+ ". Will ignore this increase request");
|
|
||||||
}
|
}
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
|
@ -195,25 +187,26 @@ public class AppSchedulingInfo {
|
||||||
return resourceUpdated;
|
return resourceUpdated;
|
||||||
}
|
}
|
||||||
|
|
||||||
// insert increase request and add missing hierarchy if missing
|
/**
|
||||||
|
* Insert increase request, adding any missing items in the data-structure
|
||||||
|
* hierarchy.
|
||||||
|
*/
|
||||||
private void insertIncreaseRequest(SchedContainerChangeRequest request) {
|
private void insertIncreaseRequest(SchedContainerChangeRequest request) {
|
||||||
NodeId nodeId = request.getNodeId();
|
NodeId nodeId = request.getNodeId();
|
||||||
Priority priority = request.getPriority();
|
Priority priority = request.getPriority();
|
||||||
ContainerId containerId = request.getContainerId();
|
ContainerId containerId = request.getContainerId();
|
||||||
|
|
||||||
Map<Priority, Map<ContainerId, SchedContainerChangeRequest>> requestsOnNode =
|
Map<Priority, Map<ContainerId, SchedContainerChangeRequest>> requestsOnNode =
|
||||||
increaseRequestMap.get(nodeId);
|
containerIncreaseRequestMap.get(nodeId);
|
||||||
if (null == requestsOnNode) {
|
if (null == requestsOnNode) {
|
||||||
requestsOnNode =
|
requestsOnNode = new HashMap<>();
|
||||||
new HashMap<Priority, Map<ContainerId, SchedContainerChangeRequest>>();
|
containerIncreaseRequestMap.put(nodeId, requestsOnNode);
|
||||||
increaseRequestMap.put(nodeId, requestsOnNode);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
Map<ContainerId, SchedContainerChangeRequest> requestsOnNodeWithPriority =
|
Map<ContainerId, SchedContainerChangeRequest> requestsOnNodeWithPriority =
|
||||||
requestsOnNode.get(priority);
|
requestsOnNode.get(priority);
|
||||||
if (null == requestsOnNodeWithPriority) {
|
if (null == requestsOnNodeWithPriority) {
|
||||||
requestsOnNodeWithPriority =
|
requestsOnNodeWithPriority = new TreeMap<>();
|
||||||
new TreeMap<ContainerId, SchedContainerChangeRequest>();
|
|
||||||
requestsOnNode.put(priority, requestsOnNodeWithPriority);
|
requestsOnNode.put(priority, requestsOnNodeWithPriority);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -237,7 +230,7 @@ public class AppSchedulingInfo {
|
||||||
public synchronized boolean removeIncreaseRequest(NodeId nodeId, Priority priority,
|
public synchronized boolean removeIncreaseRequest(NodeId nodeId, Priority priority,
|
||||||
ContainerId containerId) {
|
ContainerId containerId) {
|
||||||
Map<Priority, Map<ContainerId, SchedContainerChangeRequest>> requestsOnNode =
|
Map<Priority, Map<ContainerId, SchedContainerChangeRequest>> requestsOnNode =
|
||||||
increaseRequestMap.get(nodeId);
|
containerIncreaseRequestMap.get(nodeId);
|
||||||
if (null == requestsOnNode) {
|
if (null == requestsOnNode) {
|
||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
|
@ -256,7 +249,7 @@ public class AppSchedulingInfo {
|
||||||
requestsOnNode.remove(priority);
|
requestsOnNode.remove(priority);
|
||||||
}
|
}
|
||||||
if (requestsOnNode.isEmpty()) {
|
if (requestsOnNode.isEmpty()) {
|
||||||
increaseRequestMap.remove(nodeId);
|
containerIncreaseRequestMap.remove(nodeId);
|
||||||
}
|
}
|
||||||
|
|
||||||
if (request == null) {
|
if (request == null) {
|
||||||
|
@ -279,18 +272,15 @@ public class AppSchedulingInfo {
|
||||||
public SchedContainerChangeRequest getIncreaseRequest(NodeId nodeId,
|
public SchedContainerChangeRequest getIncreaseRequest(NodeId nodeId,
|
||||||
Priority priority, ContainerId containerId) {
|
Priority priority, ContainerId containerId) {
|
||||||
Map<Priority, Map<ContainerId, SchedContainerChangeRequest>> requestsOnNode =
|
Map<Priority, Map<ContainerId, SchedContainerChangeRequest>> requestsOnNode =
|
||||||
increaseRequestMap.get(nodeId);
|
containerIncreaseRequestMap.get(nodeId);
|
||||||
if (null == requestsOnNode) {
|
if (null == requestsOnNode) {
|
||||||
return null;
|
return null;
|
||||||
}
|
}
|
||||||
|
|
||||||
Map<ContainerId, SchedContainerChangeRequest> requestsOnNodeWithPriority =
|
Map<ContainerId, SchedContainerChangeRequest> requestsOnNodeWithPriority =
|
||||||
requestsOnNode.get(priority);
|
requestsOnNode.get(priority);
|
||||||
if (null == requestsOnNodeWithPriority) {
|
return requestsOnNodeWithPriority == null ? null
|
||||||
return null;
|
: requestsOnNodeWithPriority.get(containerId);
|
||||||
}
|
|
||||||
|
|
||||||
return requestsOnNodeWithPriority.get(containerId);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -299,39 +289,92 @@ public class AppSchedulingInfo {
|
||||||
* by the application.
|
* by the application.
|
||||||
*
|
*
|
||||||
* @param requests resources to be acquired
|
* @param requests resources to be acquired
|
||||||
* @param recoverPreemptedRequest recover Resource Request on preemption
|
* @param recoverPreemptedRequest recover ResourceRequest on preemption
|
||||||
* @return true if any resource was updated, false else
|
* @return true if any resource was updated, false otherwise
|
||||||
*/
|
*/
|
||||||
synchronized public boolean updateResourceRequests(
|
public synchronized boolean updateResourceRequests(
|
||||||
List<ResourceRequest> requests, boolean recoverPreemptedRequest) {
|
List<ResourceRequest> requests, boolean recoverPreemptedRequest) {
|
||||||
QueueMetrics metrics = queue.getMetrics();
|
// Flag to track if any incoming requests update "ANY" requests
|
||||||
|
|
||||||
boolean anyResourcesUpdated = false;
|
boolean anyResourcesUpdated = false;
|
||||||
|
|
||||||
// Update resource requests
|
// Update resource requests
|
||||||
for (ResourceRequest request : requests) {
|
for (ResourceRequest request : requests) {
|
||||||
Priority priority = request.getPriority();
|
Priority priority = request.getPriority();
|
||||||
String resourceName = request.getResourceName();
|
String resourceName = request.getResourceName();
|
||||||
boolean updatePendingResources = false;
|
|
||||||
ResourceRequest lastRequest = null;
|
// Update node labels if required
|
||||||
|
updateNodeLabels(request);
|
||||||
|
|
||||||
|
Map<String, ResourceRequest> asks = this.resourceRequestMap.get(priority);
|
||||||
|
if (asks == null) {
|
||||||
|
asks = new ConcurrentHashMap<>();
|
||||||
|
this.resourceRequestMap.put(priority, asks);
|
||||||
|
this.priorities.add(priority);
|
||||||
|
}
|
||||||
|
|
||||||
|
// Increment number of containers if recovering preempted resources
|
||||||
|
ResourceRequest lastRequest = asks.get(resourceName);
|
||||||
|
if (recoverPreemptedRequest && lastRequest != null) {
|
||||||
|
request.setNumContainers(lastRequest.getNumContainers() + 1);
|
||||||
|
}
|
||||||
|
|
||||||
|
// Update asks
|
||||||
|
asks.put(resourceName, request);
|
||||||
|
|
||||||
if (resourceName.equals(ResourceRequest.ANY)) {
|
if (resourceName.equals(ResourceRequest.ANY)) {
|
||||||
if (LOG.isDebugEnabled()) {
|
|
||||||
LOG.debug("update:" + " application=" + applicationId + " request="
|
|
||||||
+ request);
|
|
||||||
}
|
|
||||||
updatePendingResources = true;
|
|
||||||
anyResourcesUpdated = true;
|
anyResourcesUpdated = true;
|
||||||
|
|
||||||
// Premature optimization?
|
// Activate application. Metrics activation is done here.
|
||||||
// Assumes that we won't see more than one priority request updated
|
// TODO: Shouldn't we activate even if numContainers = 0?
|
||||||
// in one call, reasonable assumption... however, it's totally safe
|
|
||||||
// to activate same application more than once.
|
|
||||||
// Thus we don't need another loop ala the one in decrementOutstanding()
|
|
||||||
// which is needed during deactivate.
|
|
||||||
if (request.getNumContainers() > 0) {
|
if (request.getNumContainers() > 0) {
|
||||||
activeUsersManager.activateApplication(user, applicationId);
|
activeUsersManager.activateApplication(user, applicationId);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Update pendingResources
|
||||||
|
updatePendingResources(lastRequest, request, queue.getMetrics());
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return anyResourcesUpdated;
|
||||||
|
}
|
||||||
|
|
||||||
|
private void updatePendingResources(ResourceRequest lastRequest,
|
||||||
|
ResourceRequest request, QueueMetrics metrics) {
|
||||||
|
if (request.getNumContainers() <= 0) {
|
||||||
|
LOG.info("checking for deactivate of application :"
|
||||||
|
+ this.applicationId);
|
||||||
|
checkForDeactivation();
|
||||||
|
}
|
||||||
|
|
||||||
|
int lastRequestContainers =
|
||||||
|
(lastRequest != null) ? lastRequest.getNumContainers() : 0;
|
||||||
|
Resource lastRequestCapability =
|
||||||
|
lastRequest != null ? lastRequest.getCapability() : Resources.none();
|
||||||
|
metrics.incrPendingResources(user,
|
||||||
|
request.getNumContainers(), request.getCapability());
|
||||||
|
metrics.decrPendingResources(user,
|
||||||
|
lastRequestContainers, lastRequestCapability);
|
||||||
|
|
||||||
|
// update queue:
|
||||||
|
Resource increasedResource =
|
||||||
|
Resources.multiply(request.getCapability(), request.getNumContainers());
|
||||||
|
queue.incPendingResource(request.getNodeLabelExpression(),
|
||||||
|
increasedResource);
|
||||||
|
appResourceUsage.incPending(request.getNodeLabelExpression(),
|
||||||
|
increasedResource);
|
||||||
|
if (lastRequest != null) {
|
||||||
|
Resource decreasedResource =
|
||||||
|
Resources.multiply(lastRequestCapability, lastRequestContainers);
|
||||||
|
queue.decPendingResource(lastRequest.getNodeLabelExpression(),
|
||||||
|
decreasedResource);
|
||||||
|
appResourceUsage.decPending(lastRequest.getNodeLabelExpression(),
|
||||||
|
decreasedResource);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private void updateNodeLabels(ResourceRequest request) {
|
||||||
|
Priority priority = request.getPriority();
|
||||||
|
String resourceName = request.getResourceName();
|
||||||
|
if (resourceName.equals(ResourceRequest.ANY)) {
|
||||||
ResourceRequest previousAnyRequest =
|
ResourceRequest previousAnyRequest =
|
||||||
getResourceRequest(priority, resourceName);
|
getResourceRequest(priority, resourceName);
|
||||||
|
|
||||||
|
@ -339,7 +382,7 @@ public class AppSchedulingInfo {
|
||||||
// update label for all resource requests already added of same
|
// update label for all resource requests already added of same
|
||||||
// priority as ANY resource request.
|
// priority as ANY resource request.
|
||||||
if ((null == previousAnyRequest)
|
if ((null == previousAnyRequest)
|
||||||
|| isRequestLabelChanged(previousAnyRequest, request)) {
|
|| hasRequestLabelChanged(previousAnyRequest, request)) {
|
||||||
Map<String, ResourceRequest> resourceRequest =
|
Map<String, ResourceRequest> resourceRequest =
|
||||||
getResourceRequests(priority);
|
getResourceRequests(priority);
|
||||||
if (resourceRequest != null) {
|
if (resourceRequest != null) {
|
||||||
|
@ -357,63 +400,9 @@ public class AppSchedulingInfo {
|
||||||
request.setNodeLabelExpression(anyRequest.getNodeLabelExpression());
|
request.setNodeLabelExpression(anyRequest.getNodeLabelExpression());
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
Map<String, ResourceRequest> asks = this.resourceRequestMap.get(priority);
|
|
||||||
|
|
||||||
if (asks == null) {
|
|
||||||
asks = new ConcurrentHashMap<String, ResourceRequest>();
|
|
||||||
this.resourceRequestMap.put(priority, asks);
|
|
||||||
this.priorities.add(priority);
|
|
||||||
}
|
|
||||||
lastRequest = asks.get(resourceName);
|
|
||||||
|
|
||||||
if (recoverPreemptedRequest && lastRequest != null) {
|
|
||||||
// Increment the number of containers to 1, as it is recovering a
|
|
||||||
// single container.
|
|
||||||
request.setNumContainers(lastRequest.getNumContainers() + 1);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
asks.put(resourceName, request);
|
private boolean hasRequestLabelChanged(ResourceRequest requestOne,
|
||||||
if (updatePendingResources) {
|
|
||||||
|
|
||||||
// Similarly, deactivate application?
|
|
||||||
if (request.getNumContainers() <= 0) {
|
|
||||||
LOG.info("checking for deactivate of application :"
|
|
||||||
+ this.applicationId);
|
|
||||||
checkForDeactivation();
|
|
||||||
}
|
|
||||||
|
|
||||||
int lastRequestContainers = lastRequest != null ? lastRequest
|
|
||||||
.getNumContainers() : 0;
|
|
||||||
Resource lastRequestCapability = lastRequest != null ? lastRequest
|
|
||||||
.getCapability() : Resources.none();
|
|
||||||
metrics.incrPendingResources(user, request.getNumContainers(),
|
|
||||||
request.getCapability());
|
|
||||||
metrics.decrPendingResources(user, lastRequestContainers,
|
|
||||||
lastRequestCapability);
|
|
||||||
|
|
||||||
// update queue:
|
|
||||||
Resource increasedResource =
|
|
||||||
Resources.multiply(request.getCapability(),
|
|
||||||
request.getNumContainers());
|
|
||||||
queue.incPendingResource(request.getNodeLabelExpression(),
|
|
||||||
increasedResource);
|
|
||||||
appResourceUsage.incPending(request.getNodeLabelExpression(),
|
|
||||||
increasedResource);
|
|
||||||
if (lastRequest != null) {
|
|
||||||
Resource decreasedResource =
|
|
||||||
Resources.multiply(lastRequestCapability, lastRequestContainers);
|
|
||||||
queue.decPendingResource(lastRequest.getNodeLabelExpression(),
|
|
||||||
decreasedResource);
|
|
||||||
appResourceUsage.decPending(lastRequest.getNodeLabelExpression(),
|
|
||||||
decreasedResource);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
return anyResourcesUpdated;
|
|
||||||
}
|
|
||||||
|
|
||||||
private boolean isRequestLabelChanged(ResourceRequest requestOne,
|
|
||||||
ResourceRequest requestTwo) {
|
ResourceRequest requestTwo) {
|
||||||
String requestOneLabelExp = requestOne.getNodeLabelExpression();
|
String requestOneLabelExp = requestOne.getNodeLabelExpression();
|
||||||
String requestTwoLabelExp = requestTwo.getNodeLabelExpression();
|
String requestTwoLabelExp = requestTwo.getNodeLabelExpression();
|
||||||
|
@ -465,24 +454,24 @@ public class AppSchedulingInfo {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
synchronized public Collection<Priority> getPriorities() {
|
public synchronized Collection<Priority> getPriorities() {
|
||||||
return priorities;
|
return priorities;
|
||||||
}
|
}
|
||||||
|
|
||||||
synchronized public Map<String, ResourceRequest> getResourceRequests(
|
public synchronized Map<String, ResourceRequest> getResourceRequests(
|
||||||
Priority priority) {
|
Priority priority) {
|
||||||
return resourceRequestMap.get(priority);
|
return resourceRequestMap.get(priority);
|
||||||
}
|
}
|
||||||
|
|
||||||
public List<ResourceRequest> getAllResourceRequests() {
|
public synchronized List<ResourceRequest> getAllResourceRequests() {
|
||||||
List<ResourceRequest> ret = new ArrayList<ResourceRequest>();
|
List<ResourceRequest> ret = new ArrayList<>();
|
||||||
for (Map<String, ResourceRequest> r : resourceRequestMap.values()) {
|
for (Map<String, ResourceRequest> r : resourceRequestMap.values()) {
|
||||||
ret.addAll(r.values());
|
ret.addAll(r.values());
|
||||||
}
|
}
|
||||||
return ret;
|
return ret;
|
||||||
}
|
}
|
||||||
|
|
||||||
synchronized public ResourceRequest getResourceRequest(Priority priority,
|
public synchronized ResourceRequest getResourceRequest(Priority priority,
|
||||||
String resourceName) {
|
String resourceName) {
|
||||||
Map<String, ResourceRequest> nodeRequests = resourceRequestMap.get(priority);
|
Map<String, ResourceRequest> nodeRequests = resourceRequestMap.get(priority);
|
||||||
return (nodeRequests == null) ? null : nodeRequests.get(resourceName);
|
return (nodeRequests == null) ? null : nodeRequests.get(resourceName);
|
||||||
|
@ -559,28 +548,17 @@ public class AppSchedulingInfo {
|
||||||
/**
|
/**
|
||||||
* Resources have been allocated to this application by the resource
|
* Resources have been allocated to this application by the resource
|
||||||
* scheduler. Track them.
|
* scheduler. Track them.
|
||||||
*
|
|
||||||
* @param type
|
|
||||||
* the type of the node
|
|
||||||
* @param node
|
|
||||||
* the nodeinfo of the node
|
|
||||||
* @param priority
|
|
||||||
* the priority of the request.
|
|
||||||
* @param request
|
|
||||||
* the request
|
|
||||||
* @param container
|
|
||||||
* the containers allocated.
|
|
||||||
*/
|
*/
|
||||||
synchronized public List<ResourceRequest> allocate(NodeType type,
|
public synchronized List<ResourceRequest> allocate(NodeType type,
|
||||||
SchedulerNode node, Priority priority, ResourceRequest request,
|
SchedulerNode node, Priority priority, ResourceRequest request,
|
||||||
Container container) {
|
Container containerAllocated) {
|
||||||
List<ResourceRequest> resourceRequests = new ArrayList<ResourceRequest>();
|
List<ResourceRequest> resourceRequests = new ArrayList<>();
|
||||||
if (type == NodeType.NODE_LOCAL) {
|
if (type == NodeType.NODE_LOCAL) {
|
||||||
allocateNodeLocal(node, priority, request, container, resourceRequests);
|
allocateNodeLocal(node, priority, request, resourceRequests);
|
||||||
} else if (type == NodeType.RACK_LOCAL) {
|
} else if (type == NodeType.RACK_LOCAL) {
|
||||||
allocateRackLocal(node, priority, request, container, resourceRequests);
|
allocateRackLocal(node, priority, request, resourceRequests);
|
||||||
} else {
|
} else {
|
||||||
allocateOffSwitch(node, priority, request, container, resourceRequests);
|
allocateOffSwitch(request, resourceRequests);
|
||||||
}
|
}
|
||||||
QueueMetrics metrics = queue.getMetrics();
|
QueueMetrics metrics = queue.getMetrics();
|
||||||
if (pending) {
|
if (pending) {
|
||||||
|
@ -592,8 +570,8 @@ public class AppSchedulingInfo {
|
||||||
|
|
||||||
if (LOG.isDebugEnabled()) {
|
if (LOG.isDebugEnabled()) {
|
||||||
LOG.debug("allocate: applicationId=" + applicationId
|
LOG.debug("allocate: applicationId=" + applicationId
|
||||||
+ " container=" + container.getId()
|
+ " container=" + containerAllocated.getId()
|
||||||
+ " host=" + container.getNodeId().toString()
|
+ " host=" + containerAllocated.getNodeId().toString()
|
||||||
+ " user=" + user
|
+ " user=" + user
|
||||||
+ " resource=" + request.getCapability()
|
+ " resource=" + request.getCapability()
|
||||||
+ " type=" + type);
|
+ " type=" + type);
|
||||||
|
@ -606,12 +584,9 @@ public class AppSchedulingInfo {
|
||||||
/**
|
/**
|
||||||
* The {@link ResourceScheduler} is allocating data-local resources to the
|
* The {@link ResourceScheduler} is allocating data-local resources to the
|
||||||
* application.
|
* application.
|
||||||
*
|
|
||||||
* @param allocatedContainers
|
|
||||||
* resources allocated to the application
|
|
||||||
*/
|
*/
|
||||||
synchronized private void allocateNodeLocal(SchedulerNode node,
|
private synchronized void allocateNodeLocal(SchedulerNode node,
|
||||||
Priority priority, ResourceRequest nodeLocalRequest, Container container,
|
Priority priority, ResourceRequest nodeLocalRequest,
|
||||||
List<ResourceRequest> resourceRequests) {
|
List<ResourceRequest> resourceRequests) {
|
||||||
// Update future requirements
|
// Update future requirements
|
||||||
decResourceRequest(node.getNodeName(), priority, nodeLocalRequest);
|
decResourceRequest(node.getNodeName(), priority, nodeLocalRequest);
|
||||||
|
@ -641,12 +616,9 @@ public class AppSchedulingInfo {
|
||||||
/**
|
/**
|
||||||
* The {@link ResourceScheduler} is allocating data-local resources to the
|
* The {@link ResourceScheduler} is allocating data-local resources to the
|
||||||
* application.
|
* application.
|
||||||
*
|
|
||||||
* @param allocatedContainers
|
|
||||||
* resources allocated to the application
|
|
||||||
*/
|
*/
|
||||||
synchronized private void allocateRackLocal(SchedulerNode node,
|
private synchronized void allocateRackLocal(SchedulerNode node,
|
||||||
Priority priority, ResourceRequest rackLocalRequest, Container container,
|
Priority priority, ResourceRequest rackLocalRequest,
|
||||||
List<ResourceRequest> resourceRequests) {
|
List<ResourceRequest> resourceRequests) {
|
||||||
// Update future requirements
|
// Update future requirements
|
||||||
decResourceRequest(node.getRackName(), priority, rackLocalRequest);
|
decResourceRequest(node.getRackName(), priority, rackLocalRequest);
|
||||||
|
@ -663,20 +635,16 @@ public class AppSchedulingInfo {
|
||||||
/**
|
/**
|
||||||
* The {@link ResourceScheduler} is allocating data-local resources to the
|
* The {@link ResourceScheduler} is allocating data-local resources to the
|
||||||
* application.
|
* application.
|
||||||
*
|
|
||||||
* @param allocatedContainers
|
|
||||||
* resources allocated to the application
|
|
||||||
*/
|
*/
|
||||||
synchronized private void allocateOffSwitch(SchedulerNode node,
|
private synchronized void allocateOffSwitch(
|
||||||
Priority priority, ResourceRequest offSwitchRequest, Container container,
|
ResourceRequest offSwitchRequest, List<ResourceRequest> resourceRequests) {
|
||||||
List<ResourceRequest> resourceRequests) {
|
|
||||||
// Update future requirements
|
// Update future requirements
|
||||||
decrementOutstanding(offSwitchRequest);
|
decrementOutstanding(offSwitchRequest);
|
||||||
// Update cloned OffRack requests for recovery
|
// Update cloned OffRack requests for recovery
|
||||||
resourceRequests.add(cloneResourceRequest(offSwitchRequest));
|
resourceRequests.add(cloneResourceRequest(offSwitchRequest));
|
||||||
}
|
}
|
||||||
|
|
||||||
synchronized private void decrementOutstanding(
|
private synchronized void decrementOutstanding(
|
||||||
ResourceRequest offSwitchRequest) {
|
ResourceRequest offSwitchRequest) {
|
||||||
int numOffSwitchContainers = offSwitchRequest.getNumContainers() - 1;
|
int numOffSwitchContainers = offSwitchRequest.getNumContainers() - 1;
|
||||||
|
|
||||||
|
@ -695,7 +663,7 @@ public class AppSchedulingInfo {
|
||||||
offSwitchRequest.getCapability());
|
offSwitchRequest.getCapability());
|
||||||
}
|
}
|
||||||
|
|
||||||
synchronized private void checkForDeactivation() {
|
private synchronized void checkForDeactivation() {
|
||||||
boolean deactivate = true;
|
boolean deactivate = true;
|
||||||
for (Priority priority : getPriorities()) {
|
for (Priority priority : getPriorities()) {
|
||||||
ResourceRequest request = getResourceRequest(priority, ResourceRequest.ANY);
|
ResourceRequest request = getResourceRequest(priority, ResourceRequest.ANY);
|
||||||
|
@ -709,7 +677,7 @@ public class AppSchedulingInfo {
|
||||||
|
|
||||||
// also we need to check increase request
|
// also we need to check increase request
|
||||||
if (!deactivate) {
|
if (!deactivate) {
|
||||||
deactivate = increaseRequestMap.isEmpty();
|
deactivate = containerIncreaseRequestMap.isEmpty();
|
||||||
}
|
}
|
||||||
|
|
||||||
if (deactivate) {
|
if (deactivate) {
|
||||||
|
@ -717,7 +685,7 @@ public class AppSchedulingInfo {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
synchronized public void move(Queue newQueue) {
|
public synchronized void move(Queue newQueue) {
|
||||||
QueueMetrics oldMetrics = queue.getMetrics();
|
QueueMetrics oldMetrics = queue.getMetrics();
|
||||||
QueueMetrics newMetrics = newQueue.getMetrics();
|
QueueMetrics newMetrics = newQueue.getMetrics();
|
||||||
for (Map<String, ResourceRequest> asks : resourceRequestMap.values()) {
|
for (Map<String, ResourceRequest> asks : resourceRequestMap.values()) {
|
||||||
|
@ -741,10 +709,9 @@ public class AppSchedulingInfo {
|
||||||
activeUsersManager = newQueue.getActiveUsersManager();
|
activeUsersManager = newQueue.getActiveUsersManager();
|
||||||
activeUsersManager.activateApplication(user, applicationId);
|
activeUsersManager.activateApplication(user, applicationId);
|
||||||
this.queue = newQueue;
|
this.queue = newQueue;
|
||||||
this.queueName = newQueue.getQueueName();
|
|
||||||
}
|
}
|
||||||
|
|
||||||
synchronized public void stop(RMAppAttemptState rmAppAttemptFinalState) {
|
public synchronized void stop() {
|
||||||
// clear pending resources metrics for the application
|
// clear pending resources metrics for the application
|
||||||
QueueMetrics metrics = queue.getMetrics();
|
QueueMetrics metrics = queue.getMetrics();
|
||||||
for (Map<String, ResourceRequest> asks : resourceRequestMap.values()) {
|
for (Map<String, ResourceRequest> asks : resourceRequestMap.values()) {
|
||||||
|
@ -782,12 +749,8 @@ public class AppSchedulingInfo {
|
||||||
|
|
||||||
public synchronized void transferStateFromPreviousAppSchedulingInfo(
|
public synchronized void transferStateFromPreviousAppSchedulingInfo(
|
||||||
AppSchedulingInfo appInfo) {
|
AppSchedulingInfo appInfo) {
|
||||||
// this.priorities = appInfo.getPriorities();
|
|
||||||
// this.requests = appInfo.getRequests();
|
|
||||||
// This should not require locking the userBlacklist since it will not be
|
// This should not require locking the userBlacklist since it will not be
|
||||||
// used by this instance until after setCurrentAppAttempt.
|
// used by this instance until after setCurrentAppAttempt.
|
||||||
// Should cleanup this to avoid sharing between instances and can
|
|
||||||
// then remove getBlacklist as well.
|
|
||||||
this.userBlacklist = appInfo.getBlackList();
|
this.userBlacklist = appInfo.getBlackList();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -331,7 +331,7 @@ public class SchedulerApplicationAttempt implements SchedulableEntity {
|
||||||
public synchronized void stop(RMAppAttemptState rmAppAttemptFinalState) {
|
public synchronized void stop(RMAppAttemptState rmAppAttemptFinalState) {
|
||||||
// Cleanup all scheduling information
|
// Cleanup all scheduling information
|
||||||
isStopped = true;
|
isStopped = true;
|
||||||
appSchedulingInfo.stop(rmAppAttemptFinalState);
|
appSchedulingInfo.stop();
|
||||||
}
|
}
|
||||||
|
|
||||||
public synchronized boolean isStopped() {
|
public synchronized boolean isStopped() {
|
||||||
|
|
Loading…
Reference in New Issue