YARN-3142. Improve locks in AppSchedulingInfo. (Varun Saxena via wangda)

This commit is contained in:
Wangda Tan 2016-09-27 11:54:55 -07:00
parent 875062b5bc
commit 1831be8e73
1 changed files with 359 additions and 266 deletions

View File

@ -30,6 +30,7 @@
import java.util.concurrent.ConcurrentSkipListMap; import java.util.concurrent.ConcurrentSkipListMap;
import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import org.apache.commons.logging.Log; import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory; import org.apache.commons.logging.LogFactory;
@ -67,7 +68,8 @@ public class AppSchedulingInfo {
private Queue queue; private Queue queue;
private ActiveUsersManager activeUsersManager; private ActiveUsersManager activeUsersManager;
private boolean pending = true; // whether accepted/allocated by scheduler // whether accepted/allocated by scheduler
private volatile boolean pending = true;
private ResourceUsage appResourceUsage; private ResourceUsage appResourceUsage;
private AtomicBoolean userBlacklistChanged = new AtomicBoolean(false); private AtomicBoolean userBlacklistChanged = new AtomicBoolean(false);
@ -86,6 +88,9 @@ public class AppSchedulingInfo {
SchedContainerChangeRequest>>> containerIncreaseRequestMap = SchedContainerChangeRequest>>> containerIncreaseRequestMap =
new ConcurrentHashMap<>(); new ConcurrentHashMap<>();
private final ReentrantReadWriteLock.ReadLock readLock;
private final ReentrantReadWriteLock.WriteLock writeLock;
public AppSchedulingInfo(ApplicationAttemptId appAttemptId, public AppSchedulingInfo(ApplicationAttemptId appAttemptId,
String user, Queue queue, ActiveUsersManager activeUsersManager, String user, Queue queue, ActiveUsersManager activeUsersManager,
long epoch, ResourceUsage appResourceUsage) { long epoch, ResourceUsage appResourceUsage) {
@ -97,6 +102,10 @@ public AppSchedulingInfo(ApplicationAttemptId appAttemptId,
this.containerIdCounter = new AtomicLong( this.containerIdCounter = new AtomicLong(
epoch << ResourceManager.EPOCH_BIT_SHIFT); epoch << ResourceManager.EPOCH_BIT_SHIFT);
this.appResourceUsage = appResourceUsage; this.appResourceUsage = appResourceUsage;
ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
readLock = lock.readLock();
writeLock = lock.writeLock();
} }
public ApplicationId getApplicationId() { public ApplicationId getApplicationId() {
@ -115,11 +124,16 @@ public long getNewContainerId() {
return this.containerIdCounter.incrementAndGet(); return this.containerIdCounter.incrementAndGet();
} }
public synchronized String getQueueName() { public String getQueueName() {
return queue.getQueueName(); try {
this.readLock.lock();
return queue.getQueueName();
} finally {
this.readLock.unlock();
}
} }
public synchronized boolean isPending() { public boolean isPending() {
return pending; return pending;
} }
@ -130,88 +144,103 @@ public Set<String> getRequestedPartitions() {
/** /**
* Clear any pending requests from this application. * Clear any pending requests from this application.
*/ */
private synchronized void clearRequests() { private void clearRequests() {
schedulerKeys.clear(); schedulerKeys.clear();
resourceRequestMap.clear(); resourceRequestMap.clear();
LOG.info("Application " + applicationId + " requests cleared"); LOG.info("Application " + applicationId + " requests cleared");
} }
public synchronized boolean hasIncreaseRequest(NodeId nodeId) { public boolean hasIncreaseRequest(NodeId nodeId) {
Map<SchedulerRequestKey, Map<ContainerId, SchedContainerChangeRequest>> try {
requestsOnNode = containerIncreaseRequestMap.get(nodeId); this.readLock.lock();
return requestsOnNode == null ? false : requestsOnNode.size() > 0; Map<SchedulerRequestKey, Map<ContainerId, SchedContainerChangeRequest>>
requestsOnNode = containerIncreaseRequestMap.get(nodeId);
return requestsOnNode == null ? false : requestsOnNode.size() > 0;
} finally {
this.readLock.unlock();
}
} }
public synchronized Map<ContainerId, SchedContainerChangeRequest> public Map<ContainerId, SchedContainerChangeRequest>
getIncreaseRequests(NodeId nodeId, SchedulerRequestKey schedulerKey) { getIncreaseRequests(NodeId nodeId, SchedulerRequestKey schedulerKey) {
Map<SchedulerRequestKey, Map<ContainerId, SchedContainerChangeRequest>> try {
requestsOnNode = containerIncreaseRequestMap.get(nodeId); this.readLock.lock();
return requestsOnNode == null ? null : requestsOnNode.get( Map<SchedulerRequestKey, Map<ContainerId, SchedContainerChangeRequest>>
schedulerKey); requestsOnNode = containerIncreaseRequestMap.get(nodeId);
return requestsOnNode == null ? null : requestsOnNode.get(
schedulerKey);
} finally {
this.readLock.unlock();
}
} }
/** /**
* return true if any of the existing increase requests are updated, * return true if any of the existing increase requests are updated,
* false if none of them are updated * false if none of them are updated
*/ */
public synchronized boolean updateIncreaseRequests( public boolean updateIncreaseRequests(
List<SchedContainerChangeRequest> increaseRequests) { List<SchedContainerChangeRequest> increaseRequests) {
boolean resourceUpdated = false; boolean resourceUpdated = false;
for (SchedContainerChangeRequest r : increaseRequests) { try {
if (r.getRMContainer().getState() != RMContainerState.RUNNING) { this.writeLock.lock();
LOG.warn("rmContainer's state is not RUNNING, for increase request with" for (SchedContainerChangeRequest r : increaseRequests) {
+ " container-id=" + r.getContainerId()); if (r.getRMContainer().getState() != RMContainerState.RUNNING) {
continue; LOG.warn("rmContainer's state is not RUNNING, for increase request"
} + " with container-id=" + r.getContainerId());
try { continue;
RMServerUtils.checkSchedContainerChangeRequest(r, true); }
} catch (YarnException e) { try {
LOG.warn("Error happens when checking increase request, Ignoring.." RMServerUtils.checkSchedContainerChangeRequest(r, true);
+ " exception=", e); } catch (YarnException e) {
continue; LOG.warn("Error happens when checking increase request, Ignoring.."
} + " exception=", e);
NodeId nodeId = r.getRMContainer().getAllocatedNode(); continue;
}
NodeId nodeId = r.getRMContainer().getAllocatedNode();
Map<SchedulerRequestKey, Map<ContainerId, SchedContainerChangeRequest>> Map<SchedulerRequestKey, Map<ContainerId, SchedContainerChangeRequest>>
requestsOnNode = containerIncreaseRequestMap.get(nodeId); requestsOnNode = containerIncreaseRequestMap.get(nodeId);
if (null == requestsOnNode) { if (null == requestsOnNode) {
requestsOnNode = new TreeMap<>(); requestsOnNode = new TreeMap<>();
containerIncreaseRequestMap.put(nodeId, requestsOnNode); containerIncreaseRequestMap.put(nodeId, requestsOnNode);
} }
SchedContainerChangeRequest prevChangeRequest = SchedContainerChangeRequest prevChangeRequest =
getIncreaseRequest(nodeId, getIncreaseRequest(nodeId,
r.getRMContainer().getAllocatedSchedulerKey(), r.getRMContainer().getAllocatedSchedulerKey(),
r.getContainerId()); r.getContainerId());
if (null != prevChangeRequest) { if (null != prevChangeRequest) {
if (Resources.equals(prevChangeRequest.getTargetCapacity(), if (Resources.equals(prevChangeRequest.getTargetCapacity(),
r.getTargetCapacity())) { r.getTargetCapacity())) {
// increase request hasn't changed // increase request hasn't changed
continue;
}
// remove the old one, as we will use the new one going forward
removeIncreaseRequest(nodeId,
prevChangeRequest.getRMContainer().getAllocatedSchedulerKey(),
prevChangeRequest.getContainerId());
}
if (Resources.equals(r.getTargetCapacity(),
r.getRMContainer().getAllocatedResource())) {
if (LOG.isDebugEnabled()) {
LOG.debug("Trying to increase container " + r.getContainerId()
+ ", target capacity = previous capacity = " + prevChangeRequest
+ ". Will ignore this increase request.");
}
continue; continue;
} }
// remove the old one, as we will use the new one going forward // add the new one
removeIncreaseRequest(nodeId, resourceUpdated = true;
prevChangeRequest.getRMContainer().getAllocatedSchedulerKey(), insertIncreaseRequest(r);
prevChangeRequest.getContainerId());
} }
return resourceUpdated;
if (Resources.equals(r.getTargetCapacity(), } finally {
r.getRMContainer().getAllocatedResource())) { this.writeLock.unlock();
if (LOG.isDebugEnabled()) {
LOG.debug("Trying to increase container " + r.getContainerId()
+ ", target capacity = previous capacity = " + prevChangeRequest
+ ". Will ignore this increase request.");
}
continue;
}
// add the new one
resourceUpdated = true;
insertIncreaseRequest(r);
} }
return resourceUpdated;
} }
/** /**
@ -275,61 +304,71 @@ private void decrementSchedulerKeyReference(
} }
} }
public synchronized boolean removeIncreaseRequest(NodeId nodeId, public boolean removeIncreaseRequest(NodeId nodeId,
SchedulerRequestKey schedulerKey, ContainerId containerId) { SchedulerRequestKey schedulerKey, ContainerId containerId) {
Map<SchedulerRequestKey, Map<ContainerId, SchedContainerChangeRequest>> try {
requestsOnNode = containerIncreaseRequestMap.get(nodeId); this.writeLock.lock();
if (null == requestsOnNode) { Map<SchedulerRequestKey, Map<ContainerId, SchedContainerChangeRequest>>
return false; requestsOnNode = containerIncreaseRequestMap.get(nodeId);
if (null == requestsOnNode) {
return false;
}
Map<ContainerId, SchedContainerChangeRequest> requestsOnNodeWithPriority =
requestsOnNode.get(schedulerKey);
if (null == requestsOnNodeWithPriority) {
return false;
}
SchedContainerChangeRequest request =
requestsOnNodeWithPriority.remove(containerId);
// remove hierarchies if it becomes empty
if (requestsOnNodeWithPriority.isEmpty()) {
requestsOnNode.remove(schedulerKey);
decrementSchedulerKeyReference(schedulerKey);
}
if (requestsOnNode.isEmpty()) {
containerIncreaseRequestMap.remove(nodeId);
}
if (request == null) {
return false;
}
// update queue's pending resource if request exists
String partition = request.getRMContainer().getNodeLabelExpression();
Resource delta = request.getDeltaCapacity();
appResourceUsage.decPending(partition, delta);
queue.decPendingResource(partition, delta);
if (LOG.isDebugEnabled()) {
LOG.debug("remove increase request:" + request);
}
return true;
} finally {
this.writeLock.unlock();
} }
Map<ContainerId, SchedContainerChangeRequest> requestsOnNodeWithPriority =
requestsOnNode.get(schedulerKey);
if (null == requestsOnNodeWithPriority) {
return false;
}
SchedContainerChangeRequest request =
requestsOnNodeWithPriority.remove(containerId);
// remove hierarchies if it becomes empty
if (requestsOnNodeWithPriority.isEmpty()) {
requestsOnNode.remove(schedulerKey);
decrementSchedulerKeyReference(schedulerKey);
}
if (requestsOnNode.isEmpty()) {
containerIncreaseRequestMap.remove(nodeId);
}
if (request == null) {
return false;
}
// update queue's pending resource if request exists
String partition = request.getRMContainer().getNodeLabelExpression();
Resource delta = request.getDeltaCapacity();
appResourceUsage.decPending(partition, delta);
queue.decPendingResource(partition, delta);
if (LOG.isDebugEnabled()) {
LOG.debug("remove increase request:" + request);
}
return true;
} }
public SchedContainerChangeRequest getIncreaseRequest(NodeId nodeId, public SchedContainerChangeRequest getIncreaseRequest(NodeId nodeId,
SchedulerRequestKey schedulerKey, ContainerId containerId) { SchedulerRequestKey schedulerKey, ContainerId containerId) {
Map<SchedulerRequestKey, Map<ContainerId, SchedContainerChangeRequest>> try {
requestsOnNode = containerIncreaseRequestMap.get(nodeId); this.readLock.lock();
if (null == requestsOnNode) { Map<SchedulerRequestKey, Map<ContainerId, SchedContainerChangeRequest>>
return null; requestsOnNode = containerIncreaseRequestMap.get(nodeId);
} if (null == requestsOnNode) {
return null;
}
Map<ContainerId, SchedContainerChangeRequest> requestsOnNodeWithPriority = Map<ContainerId, SchedContainerChangeRequest> requestsOnNodeWithPriority =
requestsOnNode.get(schedulerKey); requestsOnNode.get(schedulerKey);
return requestsOnNodeWithPriority == null ? null return requestsOnNodeWithPriority == null ? null
: requestsOnNodeWithPriority.get(containerId); : requestsOnNodeWithPriority.get(containerId);
} finally {
this.readLock.unlock();
}
} }
/** /**
@ -343,49 +382,54 @@ public SchedContainerChangeRequest getIncreaseRequest(NodeId nodeId,
* recover ResourceRequest on preemption * recover ResourceRequest on preemption
* @return true if any resource was updated, false otherwise * @return true if any resource was updated, false otherwise
*/ */
public synchronized boolean updateResourceRequests( public boolean updateResourceRequests(List<ResourceRequest> requests,
List<ResourceRequest> requests,
boolean recoverPreemptedRequestForAContainer) { boolean recoverPreemptedRequestForAContainer) {
// Flag to track if any incoming requests update "ANY" requests // Flag to track if any incoming requests update "ANY" requests
boolean anyResourcesUpdated = false; boolean anyResourcesUpdated = false;
// Update resource requests try {
for (ResourceRequest request : requests) { this.writeLock.lock();
SchedulerRequestKey schedulerKey = SchedulerRequestKey.create(request); // Update resource requests
String resourceName = request.getResourceName(); for (ResourceRequest request : requests) {
SchedulerRequestKey schedulerKey = SchedulerRequestKey.create(request);
String resourceName = request.getResourceName();
// Update node labels if required // Update node labels if required
updateNodeLabels(request); updateNodeLabels(request);
Map<String, ResourceRequest> asks = Map<String, ResourceRequest> asks =
this.resourceRequestMap.get(schedulerKey); this.resourceRequestMap.get(schedulerKey);
if (asks == null) { if (asks == null) {
asks = new ConcurrentHashMap<>(); asks = new ConcurrentHashMap<>();
this.resourceRequestMap.put(schedulerKey, asks); this.resourceRequestMap.put(schedulerKey, asks);
} }
// Increment number of containers if recovering preempted resources // Increment number of containers if recovering preempted resources
ResourceRequest lastRequest = asks.get(resourceName); ResourceRequest lastRequest = asks.get(resourceName);
if (recoverPreemptedRequestForAContainer && lastRequest != null) { if (recoverPreemptedRequestForAContainer && lastRequest != null) {
request.setNumContainers(lastRequest.getNumContainers() + 1); request.setNumContainers(lastRequest.getNumContainers() + 1);
} }
// Update asks // Update asks
asks.put(resourceName, request); asks.put(resourceName, request);
if (resourceName.equals(ResourceRequest.ANY)) { if (resourceName.equals(ResourceRequest.ANY)) {
//update the applications requested labels set //update the applications requested labels set
requestedPartitions.add(request.getNodeLabelExpression() == null requestedPartitions.add(request.getNodeLabelExpression() == null
? RMNodeLabelsManager.NO_LABEL : request.getNodeLabelExpression()); ? RMNodeLabelsManager.NO_LABEL :
request.getNodeLabelExpression());
anyResourcesUpdated = true;
anyResourcesUpdated = true;
// Update pendingResources
updatePendingResources(lastRequest, request, schedulerKey, // Update pendingResources
queue.getMetrics()); updatePendingResources(lastRequest, request, schedulerKey,
queue.getMetrics());
}
} }
return anyResourcesUpdated;
} finally {
this.writeLock.unlock();
} }
return anyResourcesUpdated;
} }
private void updatePendingResources(ResourceRequest lastRequest, private void updatePendingResources(ResourceRequest lastRequest,
@ -529,34 +573,49 @@ public boolean getAndResetBlacklistChanged() {
return userBlacklistChanged.getAndSet(false); return userBlacklistChanged.getAndSet(false);
} }
public synchronized Collection<SchedulerRequestKey> getSchedulerKeys() { public Collection<SchedulerRequestKey> getSchedulerKeys() {
return schedulerKeys.keySet(); return schedulerKeys.keySet();
} }
public synchronized Map<String, ResourceRequest> getResourceRequests( public Map<String, ResourceRequest> getResourceRequests(
SchedulerRequestKey schedulerKey) { SchedulerRequestKey schedulerKey) {
return resourceRequestMap.get(schedulerKey); return resourceRequestMap.get(schedulerKey);
} }
public synchronized List<ResourceRequest> getAllResourceRequests() { public List<ResourceRequest> getAllResourceRequests() {
List<ResourceRequest> ret = new ArrayList<>(); List<ResourceRequest> ret = new ArrayList<>();
for (Map<String, ResourceRequest> r : resourceRequestMap.values()) { try {
ret.addAll(r.values()); this.readLock.lock();
for (Map<String, ResourceRequest> r : resourceRequestMap.values()) {
ret.addAll(r.values());
}
} finally {
this.readLock.unlock();
} }
return ret; return ret;
} }
public synchronized ResourceRequest getResourceRequest( public ResourceRequest getResourceRequest(SchedulerRequestKey schedulerKey,
SchedulerRequestKey schedulerKey, String resourceName) { String resourceName) {
Map<String, ResourceRequest> nodeRequests = try {
resourceRequestMap.get(schedulerKey); this.readLock.lock();
return (nodeRequests == null) ? null : nodeRequests.get(resourceName); Map<String, ResourceRequest> nodeRequests =
resourceRequestMap.get(schedulerKey);
return (nodeRequests == null) ? null : nodeRequests.get(resourceName);
} finally {
this.readLock.unlock();
}
} }
public synchronized Resource getResource(SchedulerRequestKey schedulerKey) { public Resource getResource(SchedulerRequestKey schedulerKey) {
ResourceRequest request = try {
getResourceRequest(schedulerKey, ResourceRequest.ANY); this.readLock.lock();
return (request == null) ? null : request.getCapability(); ResourceRequest request =
getResourceRequest(schedulerKey, ResourceRequest.ANY);
return (request == null) ? null : request.getCapability();
} finally {
this.readLock.unlock();
}
} }
/** /**
@ -582,8 +641,7 @@ public boolean isPlaceBlacklisted(String resourceName,
} }
} }
public synchronized void increaseContainer( public void increaseContainer(SchedContainerChangeRequest increaseRequest) {
SchedContainerChangeRequest increaseRequest) {
NodeId nodeId = increaseRequest.getNodeId(); NodeId nodeId = increaseRequest.getNodeId();
SchedulerRequestKey schedulerKey = SchedulerRequestKey schedulerKey =
increaseRequest.getRMContainer().getAllocatedSchedulerKey(); increaseRequest.getRMContainer().getAllocatedSchedulerKey();
@ -596,16 +654,21 @@ public synchronized void increaseContainer(
+ increaseRequest.getNodeId() + " user=" + user + " resource=" + increaseRequest.getNodeId() + " user=" + user + " resource="
+ deltaCapacity); + deltaCapacity);
} }
// Set queue metrics try {
queue.getMetrics().allocateResources(user, deltaCapacity); this.writeLock.lock();
// remove the increase request from pending increase request map // Set queue metrics
removeIncreaseRequest(nodeId, schedulerKey, containerId); queue.getMetrics().allocateResources(user, deltaCapacity);
// update usage // remove the increase request from pending increase request map
appResourceUsage.incUsed(increaseRequest.getNodePartition(), deltaCapacity); removeIncreaseRequest(nodeId, schedulerKey, containerId);
// update usage
appResourceUsage.incUsed(increaseRequest.getNodePartition(),
deltaCapacity);
} finally {
this.writeLock.unlock();
}
} }
public synchronized void decreaseContainer( public void decreaseContainer(SchedContainerChangeRequest decreaseRequest) {
SchedContainerChangeRequest decreaseRequest) {
// Delta is negative when it's a decrease request // Delta is negative when it's a decrease request
Resource absDelta = Resources.negate(decreaseRequest.getDeltaCapacity()); Resource absDelta = Resources.negate(decreaseRequest.getDeltaCapacity());
@ -616,11 +679,16 @@ public synchronized void decreaseContainer(
+ absDelta); + absDelta);
} }
// Set queue metrics try {
queue.getMetrics().releaseResources(user, absDelta); this.writeLock.lock();
// Set queue metrics
queue.getMetrics().releaseResources(user, absDelta);
// update usage // update usage
appResourceUsage.decUsed(decreaseRequest.getNodePartition(), absDelta); appResourceUsage.decUsed(decreaseRequest.getNodePartition(), absDelta);
} finally {
this.writeLock.unlock();
}
} }
/** /**
@ -633,43 +701,48 @@ public synchronized void decreaseContainer(
* @param containerAllocated Container Allocated * @param containerAllocated Container Allocated
* @return List of ResourceRequests * @return List of ResourceRequests
*/ */
public synchronized List<ResourceRequest> allocate(NodeType type, public List<ResourceRequest> allocate(NodeType type, SchedulerNode node,
SchedulerNode node, SchedulerRequestKey schedulerKey, SchedulerRequestKey schedulerKey, ResourceRequest request,
ResourceRequest request, Container containerAllocated) { Container containerAllocated) {
List<ResourceRequest> resourceRequests = new ArrayList<>(); List<ResourceRequest> resourceRequests = new ArrayList<>();
if (type == NodeType.NODE_LOCAL) { try {
allocateNodeLocal(node, schedulerKey, request, resourceRequests); this.writeLock.lock();
} else if (type == NodeType.RACK_LOCAL) { if (type == NodeType.NODE_LOCAL) {
allocateRackLocal(node, schedulerKey, request, resourceRequests); allocateNodeLocal(node, schedulerKey, request, resourceRequests);
} else { } else if (type == NodeType.RACK_LOCAL) {
allocateOffSwitch(request, resourceRequests, schedulerKey); allocateRackLocal(node, schedulerKey, request, resourceRequests);
} } else {
QueueMetrics metrics = queue.getMetrics(); allocateOffSwitch(request, resourceRequests, schedulerKey);
if (pending) { }
// once an allocation is done we assume the application is QueueMetrics metrics = queue.getMetrics();
// running from scheduler's POV. if (pending) {
pending = false; // once an allocation is done we assume the application is
metrics.runAppAttempt(applicationId, user); // running from scheduler's POV.
} pending = false;
metrics.runAppAttempt(applicationId, user);
}
if (LOG.isDebugEnabled()) { if (LOG.isDebugEnabled()) {
LOG.debug("allocate: applicationId=" + applicationId LOG.debug("allocate: applicationId=" + applicationId
+ " container=" + containerAllocated.getId() + " container=" + containerAllocated.getId()
+ " host=" + containerAllocated.getNodeId().toString() + " host=" + containerAllocated.getNodeId().toString()
+ " user=" + user + " user=" + user
+ " resource=" + request.getCapability() + " resource=" + request.getCapability()
+ " type=" + type); + " type=" + type);
}
metrics.allocateResources(user, 1, request.getCapability(), true);
metrics.incrNodeTypeAggregations(user, type);
return resourceRequests;
} finally {
this.writeLock.unlock();
} }
metrics.allocateResources(user, 1, request.getCapability(), true);
metrics.incrNodeTypeAggregations(user, type);
return resourceRequests;
} }
/** /**
* The {@link ResourceScheduler} is allocating data-local resources to the * The {@link ResourceScheduler} is allocating data-local resources to the
* application. * application.
*/ */
private synchronized void allocateNodeLocal(SchedulerNode node, private void allocateNodeLocal(SchedulerNode node,
SchedulerRequestKey schedulerKey, ResourceRequest nodeLocalRequest, SchedulerRequestKey schedulerKey, ResourceRequest nodeLocalRequest,
List<ResourceRequest> resourceRequests) { List<ResourceRequest> resourceRequests) {
// Update future requirements // Update future requirements
@ -701,7 +774,7 @@ private void decResourceRequest(String resourceName,
* The {@link ResourceScheduler} is allocating data-local resources to the * The {@link ResourceScheduler} is allocating data-local resources to the
* application. * application.
*/ */
private synchronized void allocateRackLocal(SchedulerNode node, private void allocateRackLocal(SchedulerNode node,
SchedulerRequestKey schedulerKey, ResourceRequest rackLocalRequest, SchedulerRequestKey schedulerKey, ResourceRequest rackLocalRequest,
List<ResourceRequest> resourceRequests) { List<ResourceRequest> resourceRequests) {
// Update future requirements // Update future requirements
@ -720,8 +793,8 @@ private synchronized void allocateRackLocal(SchedulerNode node,
* The {@link ResourceScheduler} is allocating data-local resources to the * The {@link ResourceScheduler} is allocating data-local resources to the
* application. * application.
*/ */
private synchronized void allocateOffSwitch( private void allocateOffSwitch(ResourceRequest offSwitchRequest,
ResourceRequest offSwitchRequest, List<ResourceRequest> resourceRequests, List<ResourceRequest> resourceRequests,
SchedulerRequestKey schedulerKey) { SchedulerRequestKey schedulerKey) {
// Update future requirements // Update future requirements
decrementOutstanding(offSwitchRequest, schedulerKey); decrementOutstanding(offSwitchRequest, schedulerKey);
@ -729,8 +802,8 @@ private synchronized void allocateOffSwitch(
resourceRequests.add(cloneResourceRequest(offSwitchRequest)); resourceRequests.add(cloneResourceRequest(offSwitchRequest));
} }
private synchronized void decrementOutstanding( private void decrementOutstanding(ResourceRequest offSwitchRequest,
ResourceRequest offSwitchRequest, SchedulerRequestKey schedulerKey) { SchedulerRequestKey schedulerKey) {
int numOffSwitchContainers = offSwitchRequest.getNumContainers() - 1; int numOffSwitchContainers = offSwitchRequest.getNumContainers() - 1;
// Do not remove ANY // Do not remove ANY
@ -749,65 +822,80 @@ private synchronized void decrementOutstanding(
offSwitchRequest.getCapability()); offSwitchRequest.getCapability());
} }
private synchronized void checkForDeactivation() { private void checkForDeactivation() {
if (schedulerKeys.isEmpty()) { if (schedulerKeys.isEmpty()) {
activeUsersManager.deactivateApplication(user, applicationId); activeUsersManager.deactivateApplication(user, applicationId);
} }
} }
public synchronized void move(Queue newQueue) { public void move(Queue newQueue) {
QueueMetrics oldMetrics = queue.getMetrics(); try {
QueueMetrics newMetrics = newQueue.getMetrics(); this.writeLock.lock();
for (Map<String, ResourceRequest> asks : resourceRequestMap.values()) { QueueMetrics oldMetrics = queue.getMetrics();
ResourceRequest request = asks.get(ResourceRequest.ANY); QueueMetrics newMetrics = newQueue.getMetrics();
if (request != null) { for (Map<String, ResourceRequest> asks : resourceRequestMap.values()) {
oldMetrics.decrPendingResources(user, request.getNumContainers(), ResourceRequest request = asks.get(ResourceRequest.ANY);
request.getCapability()); if (request != null) {
newMetrics.incrPendingResources(user, request.getNumContainers(), oldMetrics.decrPendingResources(user, request.getNumContainers(),
request.getCapability()); request.getCapability());
newMetrics.incrPendingResources(user, request.getNumContainers(),
request.getCapability());
Resource delta = Resources.multiply(request.getCapability(), Resource delta = Resources.multiply(request.getCapability(),
request.getNumContainers()); request.getNumContainers());
// Update Queue // Update Queue
queue.decPendingResource(request.getNodeLabelExpression(), delta); queue.decPendingResource(request.getNodeLabelExpression(), delta);
newQueue.incPendingResource(request.getNodeLabelExpression(), delta); newQueue.incPendingResource(request.getNodeLabelExpression(), delta);
}
} }
oldMetrics.moveAppFrom(this);
newMetrics.moveAppTo(this);
activeUsersManager.deactivateApplication(user, applicationId);
activeUsersManager = newQueue.getActiveUsersManager();
activeUsersManager.activateApplication(user, applicationId);
this.queue = newQueue;
} finally {
this.writeLock.unlock();
} }
oldMetrics.moveAppFrom(this);
newMetrics.moveAppTo(this);
activeUsersManager.deactivateApplication(user, applicationId);
activeUsersManager = newQueue.getActiveUsersManager();
activeUsersManager.activateApplication(user, applicationId);
this.queue = newQueue;
} }
public synchronized void stop() { public void stop() {
// clear pending resources metrics for the application // clear pending resources metrics for the application
QueueMetrics metrics = queue.getMetrics(); try {
for (Map<String, ResourceRequest> asks : resourceRequestMap.values()) { this.writeLock.lock();
ResourceRequest request = asks.get(ResourceRequest.ANY); QueueMetrics metrics = queue.getMetrics();
if (request != null) { for (Map<String, ResourceRequest> asks : resourceRequestMap.values()) {
metrics.decrPendingResources(user, request.getNumContainers(), ResourceRequest request = asks.get(ResourceRequest.ANY);
request.getCapability()); if (request != null) {
metrics.decrPendingResources(user, request.getNumContainers(),
request.getCapability());
// Update Queue // Update Queue
queue.decPendingResource( queue.decPendingResource(
request.getNodeLabelExpression(), request.getNodeLabelExpression(),
Resources.multiply(request.getCapability(), Resources.multiply(request.getCapability(),
request.getNumContainers())); request.getNumContainers()));
}
} }
metrics.finishAppAttempt(applicationId, pending, user);
// Clear requests themselves
clearRequests();
} finally {
this.writeLock.unlock();
} }
metrics.finishAppAttempt(applicationId, pending, user);
// Clear requests themselves
clearRequests();
} }
public synchronized void setQueue(Queue queue) { public void setQueue(Queue queue) {
this.queue = queue; try {
this.writeLock.lock();
this.queue = queue;
} finally {
this.writeLock.unlock();
}
} }
public Set<String> getBlackList() { private Set<String> getBlackList() {
return this.placesBlacklistedByApp; return this.placesBlacklistedByApp;
} }
@ -817,29 +905,34 @@ public Set<String> getBlackListCopy() {
} }
} }
public synchronized void transferStateFromPreviousAppSchedulingInfo( public void transferStateFromPreviousAppSchedulingInfo(
AppSchedulingInfo appInfo) { AppSchedulingInfo appInfo) {
// This should not require locking the userBlacklist since it will not be // This should not require locking the placesBlacklistedByApp since it will
// used by this instance until after setCurrentAppAttempt. // not be used by this instance until after setCurrentAppAttempt.
this.placesBlacklistedByApp = appInfo.getBlackList(); this.placesBlacklistedByApp = appInfo.getBlackList();
} }
public synchronized void recoverContainer(RMContainer rmContainer) { public void recoverContainer(RMContainer rmContainer) {
QueueMetrics metrics = queue.getMetrics(); try {
if (pending) { this.writeLock.lock();
// If there was any container to recover, the application was QueueMetrics metrics = queue.getMetrics();
// running from scheduler's POV. if (pending) {
pending = false; // If there was any container to recover, the application was
metrics.runAppAttempt(applicationId, user); // running from scheduler's POV.
} pending = false;
metrics.runAppAttempt(applicationId, user);
}
// Container is completed. Skip recovering resources. // Container is completed. Skip recovering resources.
if (rmContainer.getState().equals(RMContainerState.COMPLETED)) { if (rmContainer.getState().equals(RMContainerState.COMPLETED)) {
return; return;
} }
metrics.allocateResources(user, 1, rmContainer.getAllocatedResource(), metrics.allocateResources(user, 1, rmContainer.getAllocatedResource(),
false); false);
} finally {
this.writeLock.unlock();
}
} }
public ResourceRequest cloneResourceRequest(ResourceRequest request) { public ResourceRequest cloneResourceRequest(ResourceRequest request) {