From 1831be8e737fd423a9f3d590767b944147e85641 Mon Sep 17 00:00:00 2001 From: Wangda Tan Date: Tue, 27 Sep 2016 11:54:55 -0700 Subject: [PATCH] YARN-3142. Improve locks in AppSchedulingInfo. (Varun Saxena via wangda) --- .../scheduler/AppSchedulingInfo.java | 625 ++++++++++-------- 1 file changed, 359 insertions(+), 266 deletions(-) diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AppSchedulingInfo.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AppSchedulingInfo.java index 39820f7860b..59a6650d374 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AppSchedulingInfo.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AppSchedulingInfo.java @@ -30,6 +30,7 @@ import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentSkipListMap; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicLong; +import java.util.concurrent.locks.ReentrantReadWriteLock; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -67,7 +68,8 @@ public class AppSchedulingInfo { private Queue queue; private ActiveUsersManager activeUsersManager; - private boolean pending = true; // whether accepted/allocated by scheduler + // whether accepted/allocated by scheduler + private volatile boolean pending = true; private ResourceUsage appResourceUsage; private AtomicBoolean userBlacklistChanged = new AtomicBoolean(false); @@ -86,6 +88,9 @@ public class AppSchedulingInfo { SchedContainerChangeRequest>>> containerIncreaseRequestMap = new ConcurrentHashMap<>(); + private final ReentrantReadWriteLock.ReadLock readLock; + private final ReentrantReadWriteLock.WriteLock writeLock; + public AppSchedulingInfo(ApplicationAttemptId appAttemptId, String user, Queue queue, ActiveUsersManager activeUsersManager, long epoch, ResourceUsage appResourceUsage) { @@ -97,6 +102,10 @@ public class AppSchedulingInfo { this.containerIdCounter = new AtomicLong( epoch << ResourceManager.EPOCH_BIT_SHIFT); this.appResourceUsage = appResourceUsage; + + ReentrantReadWriteLock lock = new ReentrantReadWriteLock(); + readLock = lock.readLock(); + writeLock = lock.writeLock(); } public ApplicationId getApplicationId() { @@ -115,14 +124,19 @@ public class AppSchedulingInfo { return this.containerIdCounter.incrementAndGet(); } - public synchronized String getQueueName() { - return queue.getQueueName(); + public String getQueueName() { + try { + this.readLock.lock(); + return queue.getQueueName(); + } finally { + this.readLock.unlock(); + } } - public synchronized boolean isPending() { + public boolean isPending() { return pending; } - + public Set getRequestedPartitions() { return requestedPartitions; } @@ -130,88 +144,103 @@ public class AppSchedulingInfo { /** * Clear any pending requests from this application. */ - private synchronized void clearRequests() { + private void clearRequests() { schedulerKeys.clear(); resourceRequestMap.clear(); LOG.info("Application " + applicationId + " requests cleared"); } - public synchronized boolean hasIncreaseRequest(NodeId nodeId) { - Map> - requestsOnNode = containerIncreaseRequestMap.get(nodeId); - return requestsOnNode == null ? false : requestsOnNode.size() > 0; + public boolean hasIncreaseRequest(NodeId nodeId) { + try { + this.readLock.lock(); + Map> + requestsOnNode = containerIncreaseRequestMap.get(nodeId); + return requestsOnNode == null ? false : requestsOnNode.size() > 0; + } finally { + this.readLock.unlock(); + } } - public synchronized Map + public Map getIncreaseRequests(NodeId nodeId, SchedulerRequestKey schedulerKey) { - Map> - requestsOnNode = containerIncreaseRequestMap.get(nodeId); - return requestsOnNode == null ? null : requestsOnNode.get( - schedulerKey); + try { + this.readLock.lock(); + Map> + requestsOnNode = containerIncreaseRequestMap.get(nodeId); + return requestsOnNode == null ? null : requestsOnNode.get( + schedulerKey); + } finally { + this.readLock.unlock(); + } } /** * return true if any of the existing increase requests are updated, * false if none of them are updated */ - public synchronized boolean updateIncreaseRequests( + public boolean updateIncreaseRequests( List increaseRequests) { boolean resourceUpdated = false; - for (SchedContainerChangeRequest r : increaseRequests) { - if (r.getRMContainer().getState() != RMContainerState.RUNNING) { - LOG.warn("rmContainer's state is not RUNNING, for increase request with" - + " container-id=" + r.getContainerId()); - continue; - } - try { - RMServerUtils.checkSchedContainerChangeRequest(r, true); - } catch (YarnException e) { - LOG.warn("Error happens when checking increase request, Ignoring.." - + " exception=", e); - continue; - } - NodeId nodeId = r.getRMContainer().getAllocatedNode(); + try { + this.writeLock.lock(); + for (SchedContainerChangeRequest r : increaseRequests) { + if (r.getRMContainer().getState() != RMContainerState.RUNNING) { + LOG.warn("rmContainer's state is not RUNNING, for increase request" + + " with container-id=" + r.getContainerId()); + continue; + } + try { + RMServerUtils.checkSchedContainerChangeRequest(r, true); + } catch (YarnException e) { + LOG.warn("Error happens when checking increase request, Ignoring.." + + " exception=", e); + continue; + } + NodeId nodeId = r.getRMContainer().getAllocatedNode(); - Map> - requestsOnNode = containerIncreaseRequestMap.get(nodeId); - if (null == requestsOnNode) { - requestsOnNode = new TreeMap<>(); - containerIncreaseRequestMap.put(nodeId, requestsOnNode); - } + Map> + requestsOnNode = containerIncreaseRequestMap.get(nodeId); + if (null == requestsOnNode) { + requestsOnNode = new TreeMap<>(); + containerIncreaseRequestMap.put(nodeId, requestsOnNode); + } - SchedContainerChangeRequest prevChangeRequest = - getIncreaseRequest(nodeId, - r.getRMContainer().getAllocatedSchedulerKey(), - r.getContainerId()); - if (null != prevChangeRequest) { - if (Resources.equals(prevChangeRequest.getTargetCapacity(), - r.getTargetCapacity())) { - // increase request hasn't changed + SchedContainerChangeRequest prevChangeRequest = + getIncreaseRequest(nodeId, + r.getRMContainer().getAllocatedSchedulerKey(), + r.getContainerId()); + if (null != prevChangeRequest) { + if (Resources.equals(prevChangeRequest.getTargetCapacity(), + r.getTargetCapacity())) { + // increase request hasn't changed + continue; + } + + // remove the old one, as we will use the new one going forward + removeIncreaseRequest(nodeId, + prevChangeRequest.getRMContainer().getAllocatedSchedulerKey(), + prevChangeRequest.getContainerId()); + } + + if (Resources.equals(r.getTargetCapacity(), + r.getRMContainer().getAllocatedResource())) { + if (LOG.isDebugEnabled()) { + LOG.debug("Trying to increase container " + r.getContainerId() + + ", target capacity = previous capacity = " + prevChangeRequest + + ". Will ignore this increase request."); + } continue; } - // remove the old one, as we will use the new one going forward - removeIncreaseRequest(nodeId, - prevChangeRequest.getRMContainer().getAllocatedSchedulerKey(), - prevChangeRequest.getContainerId()); + // add the new one + resourceUpdated = true; + insertIncreaseRequest(r); } - - if (Resources.equals(r.getTargetCapacity(), - r.getRMContainer().getAllocatedResource())) { - if (LOG.isDebugEnabled()) { - LOG.debug("Trying to increase container " + r.getContainerId() - + ", target capacity = previous capacity = " + prevChangeRequest - + ". Will ignore this increase request."); - } - continue; - } - - // add the new one - resourceUpdated = true; - insertIncreaseRequest(r); + return resourceUpdated; + } finally { + this.writeLock.unlock(); } - return resourceUpdated; } /** @@ -275,61 +304,71 @@ public class AppSchedulingInfo { } } - public synchronized boolean removeIncreaseRequest(NodeId nodeId, + public boolean removeIncreaseRequest(NodeId nodeId, SchedulerRequestKey schedulerKey, ContainerId containerId) { - Map> - requestsOnNode = containerIncreaseRequestMap.get(nodeId); - if (null == requestsOnNode) { - return false; - } + try { + this.writeLock.lock(); + Map> + requestsOnNode = containerIncreaseRequestMap.get(nodeId); + if (null == requestsOnNode) { + return false; + } - Map requestsOnNodeWithPriority = - requestsOnNode.get(schedulerKey); - if (null == requestsOnNodeWithPriority) { - return false; - } + Map requestsOnNodeWithPriority = + requestsOnNode.get(schedulerKey); + if (null == requestsOnNodeWithPriority) { + return false; + } - SchedContainerChangeRequest request = - requestsOnNodeWithPriority.remove(containerId); + SchedContainerChangeRequest request = + requestsOnNodeWithPriority.remove(containerId); - // remove hierarchies if it becomes empty - if (requestsOnNodeWithPriority.isEmpty()) { - requestsOnNode.remove(schedulerKey); - decrementSchedulerKeyReference(schedulerKey); - } - if (requestsOnNode.isEmpty()) { - containerIncreaseRequestMap.remove(nodeId); - } - - if (request == null) { - return false; - } + // remove hierarchies if it becomes empty + if (requestsOnNodeWithPriority.isEmpty()) { + requestsOnNode.remove(schedulerKey); + decrementSchedulerKeyReference(schedulerKey); + } + if (requestsOnNode.isEmpty()) { + containerIncreaseRequestMap.remove(nodeId); + } - // update queue's pending resource if request exists - String partition = request.getRMContainer().getNodeLabelExpression(); - Resource delta = request.getDeltaCapacity(); - appResourceUsage.decPending(partition, delta); - queue.decPendingResource(partition, delta); - - if (LOG.isDebugEnabled()) { - LOG.debug("remove increase request:" + request); + if (request == null) { + return false; + } + + // update queue's pending resource if request exists + String partition = request.getRMContainer().getNodeLabelExpression(); + Resource delta = request.getDeltaCapacity(); + appResourceUsage.decPending(partition, delta); + queue.decPendingResource(partition, delta); + + if (LOG.isDebugEnabled()) { + LOG.debug("remove increase request:" + request); + } + + return true; + } finally { + this.writeLock.unlock(); } - - return true; } public SchedContainerChangeRequest getIncreaseRequest(NodeId nodeId, SchedulerRequestKey schedulerKey, ContainerId containerId) { - Map> - requestsOnNode = containerIncreaseRequestMap.get(nodeId); - if (null == requestsOnNode) { - return null; - } + try { + this.readLock.lock(); + Map> + requestsOnNode = containerIncreaseRequestMap.get(nodeId); + if (null == requestsOnNode) { + return null; + } - Map requestsOnNodeWithPriority = - requestsOnNode.get(schedulerKey); - return requestsOnNodeWithPriority == null ? null - : requestsOnNodeWithPriority.get(containerId); + Map requestsOnNodeWithPriority = + requestsOnNode.get(schedulerKey); + return requestsOnNodeWithPriority == null ? null + : requestsOnNodeWithPriority.get(containerId); + } finally { + this.readLock.unlock(); + } } /** @@ -343,49 +382,54 @@ public class AppSchedulingInfo { * recover ResourceRequest on preemption * @return true if any resource was updated, false otherwise */ - public synchronized boolean updateResourceRequests( - List requests, + public boolean updateResourceRequests(List requests, boolean recoverPreemptedRequestForAContainer) { // Flag to track if any incoming requests update "ANY" requests boolean anyResourcesUpdated = false; - // Update resource requests - for (ResourceRequest request : requests) { - SchedulerRequestKey schedulerKey = SchedulerRequestKey.create(request); - String resourceName = request.getResourceName(); + try { + this.writeLock.lock(); + // Update resource requests + for (ResourceRequest request : requests) { + SchedulerRequestKey schedulerKey = SchedulerRequestKey.create(request); + String resourceName = request.getResourceName(); - // Update node labels if required - updateNodeLabels(request); + // Update node labels if required + updateNodeLabels(request); - Map asks = - this.resourceRequestMap.get(schedulerKey); - if (asks == null) { - asks = new ConcurrentHashMap<>(); - this.resourceRequestMap.put(schedulerKey, asks); - } - - // Increment number of containers if recovering preempted resources - ResourceRequest lastRequest = asks.get(resourceName); - if (recoverPreemptedRequestForAContainer && lastRequest != null) { - request.setNumContainers(lastRequest.getNumContainers() + 1); - } - - // Update asks - asks.put(resourceName, request); - - if (resourceName.equals(ResourceRequest.ANY)) { - //update the applications requested labels set - requestedPartitions.add(request.getNodeLabelExpression() == null - ? RMNodeLabelsManager.NO_LABEL : request.getNodeLabelExpression()); - - anyResourcesUpdated = true; - - // Update pendingResources - updatePendingResources(lastRequest, request, schedulerKey, - queue.getMetrics()); + Map asks = + this.resourceRequestMap.get(schedulerKey); + if (asks == null) { + asks = new ConcurrentHashMap<>(); + this.resourceRequestMap.put(schedulerKey, asks); + } + + // Increment number of containers if recovering preempted resources + ResourceRequest lastRequest = asks.get(resourceName); + if (recoverPreemptedRequestForAContainer && lastRequest != null) { + request.setNumContainers(lastRequest.getNumContainers() + 1); + } + + // Update asks + asks.put(resourceName, request); + + if (resourceName.equals(ResourceRequest.ANY)) { + //update the applications requested labels set + requestedPartitions.add(request.getNodeLabelExpression() == null + ? RMNodeLabelsManager.NO_LABEL : + request.getNodeLabelExpression()); + + anyResourcesUpdated = true; + + // Update pendingResources + updatePendingResources(lastRequest, request, schedulerKey, + queue.getMetrics()); + } } + return anyResourcesUpdated; + } finally { + this.writeLock.unlock(); } - return anyResourcesUpdated; } private void updatePendingResources(ResourceRequest lastRequest, @@ -529,34 +573,49 @@ public class AppSchedulingInfo { return userBlacklistChanged.getAndSet(false); } - public synchronized Collection getSchedulerKeys() { + public Collection getSchedulerKeys() { return schedulerKeys.keySet(); } - public synchronized Map getResourceRequests( + public Map getResourceRequests( SchedulerRequestKey schedulerKey) { return resourceRequestMap.get(schedulerKey); } - public synchronized List getAllResourceRequests() { + public List getAllResourceRequests() { List ret = new ArrayList<>(); - for (Map r : resourceRequestMap.values()) { - ret.addAll(r.values()); + try { + this.readLock.lock(); + for (Map r : resourceRequestMap.values()) { + ret.addAll(r.values()); + } + } finally { + this.readLock.unlock(); } return ret; } - public synchronized ResourceRequest getResourceRequest( - SchedulerRequestKey schedulerKey, String resourceName) { - Map nodeRequests = - resourceRequestMap.get(schedulerKey); - return (nodeRequests == null) ? null : nodeRequests.get(resourceName); + public ResourceRequest getResourceRequest(SchedulerRequestKey schedulerKey, + String resourceName) { + try { + this.readLock.lock(); + Map nodeRequests = + resourceRequestMap.get(schedulerKey); + return (nodeRequests == null) ? null : nodeRequests.get(resourceName); + } finally { + this.readLock.unlock(); + } } - public synchronized Resource getResource(SchedulerRequestKey schedulerKey) { - ResourceRequest request = - getResourceRequest(schedulerKey, ResourceRequest.ANY); - return (request == null) ? null : request.getCapability(); + public Resource getResource(SchedulerRequestKey schedulerKey) { + try { + this.readLock.lock(); + ResourceRequest request = + getResourceRequest(schedulerKey, ResourceRequest.ANY); + return (request == null) ? null : request.getCapability(); + } finally { + this.readLock.unlock(); + } } /** @@ -582,8 +641,7 @@ public class AppSchedulingInfo { } } - public synchronized void increaseContainer( - SchedContainerChangeRequest increaseRequest) { + public void increaseContainer(SchedContainerChangeRequest increaseRequest) { NodeId nodeId = increaseRequest.getNodeId(); SchedulerRequestKey schedulerKey = increaseRequest.getRMContainer().getAllocatedSchedulerKey(); @@ -596,16 +654,21 @@ public class AppSchedulingInfo { + increaseRequest.getNodeId() + " user=" + user + " resource=" + deltaCapacity); } - // Set queue metrics - queue.getMetrics().allocateResources(user, deltaCapacity); - // remove the increase request from pending increase request map - removeIncreaseRequest(nodeId, schedulerKey, containerId); - // update usage - appResourceUsage.incUsed(increaseRequest.getNodePartition(), deltaCapacity); + try { + this.writeLock.lock(); + // Set queue metrics + queue.getMetrics().allocateResources(user, deltaCapacity); + // remove the increase request from pending increase request map + removeIncreaseRequest(nodeId, schedulerKey, containerId); + // update usage + appResourceUsage.incUsed(increaseRequest.getNodePartition(), + deltaCapacity); + } finally { + this.writeLock.unlock(); + } } - public synchronized void decreaseContainer( - SchedContainerChangeRequest decreaseRequest) { + public void decreaseContainer(SchedContainerChangeRequest decreaseRequest) { // Delta is negative when it's a decrease request Resource absDelta = Resources.negate(decreaseRequest.getDeltaCapacity()); @@ -615,12 +678,17 @@ public class AppSchedulingInfo { + decreaseRequest.getNodeId() + " user=" + user + " resource=" + absDelta); } - - // Set queue metrics - queue.getMetrics().releaseResources(user, absDelta); - // update usage - appResourceUsage.decUsed(decreaseRequest.getNodePartition(), absDelta); + try { + this.writeLock.lock(); + // Set queue metrics + queue.getMetrics().releaseResources(user, absDelta); + + // update usage + appResourceUsage.decUsed(decreaseRequest.getNodePartition(), absDelta); + } finally { + this.writeLock.unlock(); + } } /** @@ -633,43 +701,48 @@ public class AppSchedulingInfo { * @param containerAllocated Container Allocated * @return List of ResourceRequests */ - public synchronized List allocate(NodeType type, - SchedulerNode node, SchedulerRequestKey schedulerKey, - ResourceRequest request, Container containerAllocated) { + public List allocate(NodeType type, SchedulerNode node, + SchedulerRequestKey schedulerKey, ResourceRequest request, + Container containerAllocated) { List resourceRequests = new ArrayList<>(); - if (type == NodeType.NODE_LOCAL) { - allocateNodeLocal(node, schedulerKey, request, resourceRequests); - } else if (type == NodeType.RACK_LOCAL) { - allocateRackLocal(node, schedulerKey, request, resourceRequests); - } else { - allocateOffSwitch(request, resourceRequests, schedulerKey); - } - QueueMetrics metrics = queue.getMetrics(); - if (pending) { - // once an allocation is done we assume the application is - // running from scheduler's POV. - pending = false; - metrics.runAppAttempt(applicationId, user); - } + try { + this.writeLock.lock(); + if (type == NodeType.NODE_LOCAL) { + allocateNodeLocal(node, schedulerKey, request, resourceRequests); + } else if (type == NodeType.RACK_LOCAL) { + allocateRackLocal(node, schedulerKey, request, resourceRequests); + } else { + allocateOffSwitch(request, resourceRequests, schedulerKey); + } + QueueMetrics metrics = queue.getMetrics(); + if (pending) { + // once an allocation is done we assume the application is + // running from scheduler's POV. + pending = false; + metrics.runAppAttempt(applicationId, user); + } - if (LOG.isDebugEnabled()) { - LOG.debug("allocate: applicationId=" + applicationId - + " container=" + containerAllocated.getId() - + " host=" + containerAllocated.getNodeId().toString() - + " user=" + user - + " resource=" + request.getCapability() - + " type=" + type); + if (LOG.isDebugEnabled()) { + LOG.debug("allocate: applicationId=" + applicationId + + " container=" + containerAllocated.getId() + + " host=" + containerAllocated.getNodeId().toString() + + " user=" + user + + " resource=" + request.getCapability() + + " type=" + type); + } + metrics.allocateResources(user, 1, request.getCapability(), true); + metrics.incrNodeTypeAggregations(user, type); + return resourceRequests; + } finally { + this.writeLock.unlock(); } - metrics.allocateResources(user, 1, request.getCapability(), true); - metrics.incrNodeTypeAggregations(user, type); - return resourceRequests; } /** * The {@link ResourceScheduler} is allocating data-local resources to the * application. */ - private synchronized void allocateNodeLocal(SchedulerNode node, + private void allocateNodeLocal(SchedulerNode node, SchedulerRequestKey schedulerKey, ResourceRequest nodeLocalRequest, List resourceRequests) { // Update future requirements @@ -701,7 +774,7 @@ public class AppSchedulingInfo { * The {@link ResourceScheduler} is allocating data-local resources to the * application. */ - private synchronized void allocateRackLocal(SchedulerNode node, + private void allocateRackLocal(SchedulerNode node, SchedulerRequestKey schedulerKey, ResourceRequest rackLocalRequest, List resourceRequests) { // Update future requirements @@ -720,8 +793,8 @@ public class AppSchedulingInfo { * The {@link ResourceScheduler} is allocating data-local resources to the * application. */ - private synchronized void allocateOffSwitch( - ResourceRequest offSwitchRequest, List resourceRequests, + private void allocateOffSwitch(ResourceRequest offSwitchRequest, + List resourceRequests, SchedulerRequestKey schedulerKey) { // Update future requirements decrementOutstanding(offSwitchRequest, schedulerKey); @@ -729,8 +802,8 @@ public class AppSchedulingInfo { resourceRequests.add(cloneResourceRequest(offSwitchRequest)); } - private synchronized void decrementOutstanding( - ResourceRequest offSwitchRequest, SchedulerRequestKey schedulerKey) { + private void decrementOutstanding(ResourceRequest offSwitchRequest, + SchedulerRequestKey schedulerKey) { int numOffSwitchContainers = offSwitchRequest.getNumContainers() - 1; // Do not remove ANY @@ -748,66 +821,81 @@ public class AppSchedulingInfo { queue.decPendingResource(offSwitchRequest.getNodeLabelExpression(), offSwitchRequest.getCapability()); } - - private synchronized void checkForDeactivation() { + + private void checkForDeactivation() { if (schedulerKeys.isEmpty()) { activeUsersManager.deactivateApplication(user, applicationId); } } - public synchronized void move(Queue newQueue) { - QueueMetrics oldMetrics = queue.getMetrics(); - QueueMetrics newMetrics = newQueue.getMetrics(); - for (Map asks : resourceRequestMap.values()) { - ResourceRequest request = asks.get(ResourceRequest.ANY); - if (request != null) { - oldMetrics.decrPendingResources(user, request.getNumContainers(), - request.getCapability()); - newMetrics.incrPendingResources(user, request.getNumContainers(), - request.getCapability()); - - Resource delta = Resources.multiply(request.getCapability(), - request.getNumContainers()); - // Update Queue - queue.decPendingResource(request.getNodeLabelExpression(), delta); - newQueue.incPendingResource(request.getNodeLabelExpression(), delta); + public void move(Queue newQueue) { + try { + this.writeLock.lock(); + QueueMetrics oldMetrics = queue.getMetrics(); + QueueMetrics newMetrics = newQueue.getMetrics(); + for (Map asks : resourceRequestMap.values()) { + ResourceRequest request = asks.get(ResourceRequest.ANY); + if (request != null) { + oldMetrics.decrPendingResources(user, request.getNumContainers(), + request.getCapability()); + newMetrics.incrPendingResources(user, request.getNumContainers(), + request.getCapability()); + + Resource delta = Resources.multiply(request.getCapability(), + request.getNumContainers()); + // Update Queue + queue.decPendingResource(request.getNodeLabelExpression(), delta); + newQueue.incPendingResource(request.getNodeLabelExpression(), delta); + } } + oldMetrics.moveAppFrom(this); + newMetrics.moveAppTo(this); + activeUsersManager.deactivateApplication(user, applicationId); + activeUsersManager = newQueue.getActiveUsersManager(); + activeUsersManager.activateApplication(user, applicationId); + this.queue = newQueue; + } finally { + this.writeLock.unlock(); } - oldMetrics.moveAppFrom(this); - newMetrics.moveAppTo(this); - activeUsersManager.deactivateApplication(user, applicationId); - activeUsersManager = newQueue.getActiveUsersManager(); - activeUsersManager.activateApplication(user, applicationId); - this.queue = newQueue; } - public synchronized void stop() { + public void stop() { // clear pending resources metrics for the application - QueueMetrics metrics = queue.getMetrics(); - for (Map asks : resourceRequestMap.values()) { - ResourceRequest request = asks.get(ResourceRequest.ANY); - if (request != null) { - metrics.decrPendingResources(user, request.getNumContainers(), - request.getCapability()); - - // Update Queue - queue.decPendingResource( - request.getNodeLabelExpression(), - Resources.multiply(request.getCapability(), - request.getNumContainers())); + try { + this.writeLock.lock(); + QueueMetrics metrics = queue.getMetrics(); + for (Map asks : resourceRequestMap.values()) { + ResourceRequest request = asks.get(ResourceRequest.ANY); + if (request != null) { + metrics.decrPendingResources(user, request.getNumContainers(), + request.getCapability()); + + // Update Queue + queue.decPendingResource( + request.getNodeLabelExpression(), + Resources.multiply(request.getCapability(), + request.getNumContainers())); + } } + metrics.finishAppAttempt(applicationId, pending, user); + + // Clear requests themselves + clearRequests(); + } finally { + this.writeLock.unlock(); } - metrics.finishAppAttempt(applicationId, pending, user); - - // Clear requests themselves - clearRequests(); } - public synchronized void setQueue(Queue queue) { - this.queue = queue; + public void setQueue(Queue queue) { + try { + this.writeLock.lock(); + this.queue = queue; + } finally { + this.writeLock.unlock(); + } } - public Set getBlackList() { + private Set getBlackList() { return this.placesBlacklistedByApp; } @@ -817,31 +905,36 @@ public class AppSchedulingInfo { } } - public synchronized void transferStateFromPreviousAppSchedulingInfo( + public void transferStateFromPreviousAppSchedulingInfo( AppSchedulingInfo appInfo) { - // This should not require locking the userBlacklist since it will not be - // used by this instance until after setCurrentAppAttempt. + // This should not require locking the placesBlacklistedByApp since it will + // not be used by this instance until after setCurrentAppAttempt. this.placesBlacklistedByApp = appInfo.getBlackList(); } - public synchronized void recoverContainer(RMContainer rmContainer) { - QueueMetrics metrics = queue.getMetrics(); - if (pending) { - // If there was any container to recover, the application was - // running from scheduler's POV. - pending = false; - metrics.runAppAttempt(applicationId, user); - } + public void recoverContainer(RMContainer rmContainer) { + try { + this.writeLock.lock(); + QueueMetrics metrics = queue.getMetrics(); + if (pending) { + // If there was any container to recover, the application was + // running from scheduler's POV. + pending = false; + metrics.runAppAttempt(applicationId, user); + } - // Container is completed. Skip recovering resources. - if (rmContainer.getState().equals(RMContainerState.COMPLETED)) { - return; - } + // Container is completed. Skip recovering resources. + if (rmContainer.getState().equals(RMContainerState.COMPLETED)) { + return; + } - metrics.allocateResources(user, 1, rmContainer.getAllocatedResource(), - false); + metrics.allocateResources(user, 1, rmContainer.getAllocatedResource(), + false); + } finally { + this.writeLock.unlock(); + } } - + public ResourceRequest cloneResourceRequest(ResourceRequest request) { ResourceRequest newRequest = ResourceRequest.newInstance(request.getPriority(),