YARN-3142. Improve locks in AppSchedulingInfo. (Varun Saxena via wangda)

(cherry picked from commit 1831be8e73)
This commit is contained in:
Wangda Tan 2016-09-27 11:54:55 -07:00
parent 269401dc83
commit 379c2b3e5b
1 changed files with 359 additions and 266 deletions

View File

@ -30,6 +30,7 @@ import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentSkipListMap;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
@ -67,7 +68,8 @@ public class AppSchedulingInfo {
private Queue queue;
private ActiveUsersManager activeUsersManager;
private boolean pending = true; // whether accepted/allocated by scheduler
// whether accepted/allocated by scheduler
private volatile boolean pending = true;
private ResourceUsage appResourceUsage;
private AtomicBoolean userBlacklistChanged = new AtomicBoolean(false);
@ -86,6 +88,9 @@ public class AppSchedulingInfo {
SchedContainerChangeRequest>>> containerIncreaseRequestMap =
new ConcurrentHashMap<>();
private final ReentrantReadWriteLock.ReadLock readLock;
private final ReentrantReadWriteLock.WriteLock writeLock;
public AppSchedulingInfo(ApplicationAttemptId appAttemptId,
String user, Queue queue, ActiveUsersManager activeUsersManager,
long epoch, ResourceUsage appResourceUsage) {
@ -97,6 +102,10 @@ public class AppSchedulingInfo {
this.containerIdCounter = new AtomicLong(
epoch << ResourceManager.EPOCH_BIT_SHIFT);
this.appResourceUsage = appResourceUsage;
ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
readLock = lock.readLock();
writeLock = lock.writeLock();
}
public ApplicationId getApplicationId() {
@ -115,11 +124,16 @@ public class AppSchedulingInfo {
return this.containerIdCounter.incrementAndGet();
}
public synchronized String getQueueName() {
public String getQueueName() {
try {
this.readLock.lock();
return queue.getQueueName();
} finally {
this.readLock.unlock();
}
}
public synchronized boolean isPending() {
public boolean isPending() {
return pending;
}
@ -130,38 +144,50 @@ public class AppSchedulingInfo {
/**
* Clear any pending requests from this application.
*/
private synchronized void clearRequests() {
private void clearRequests() {
schedulerKeys.clear();
resourceRequestMap.clear();
LOG.info("Application " + applicationId + " requests cleared");
}
public synchronized boolean hasIncreaseRequest(NodeId nodeId) {
public boolean hasIncreaseRequest(NodeId nodeId) {
try {
this.readLock.lock();
Map<SchedulerRequestKey, Map<ContainerId, SchedContainerChangeRequest>>
requestsOnNode = containerIncreaseRequestMap.get(nodeId);
return requestsOnNode == null ? false : requestsOnNode.size() > 0;
} finally {
this.readLock.unlock();
}
}
public synchronized Map<ContainerId, SchedContainerChangeRequest>
public Map<ContainerId, SchedContainerChangeRequest>
getIncreaseRequests(NodeId nodeId, SchedulerRequestKey schedulerKey) {
try {
this.readLock.lock();
Map<SchedulerRequestKey, Map<ContainerId, SchedContainerChangeRequest>>
requestsOnNode = containerIncreaseRequestMap.get(nodeId);
return requestsOnNode == null ? null : requestsOnNode.get(
schedulerKey);
} finally {
this.readLock.unlock();
}
}
/**
* return true if any of the existing increase requests are updated,
* false if none of them are updated
*/
public synchronized boolean updateIncreaseRequests(
public boolean updateIncreaseRequests(
List<SchedContainerChangeRequest> increaseRequests) {
boolean resourceUpdated = false;
try {
this.writeLock.lock();
for (SchedContainerChangeRequest r : increaseRequests) {
if (r.getRMContainer().getState() != RMContainerState.RUNNING) {
LOG.warn("rmContainer's state is not RUNNING, for increase request with"
+ " container-id=" + r.getContainerId());
LOG.warn("rmContainer's state is not RUNNING, for increase request"
+ " with container-id=" + r.getContainerId());
continue;
}
try {
@ -212,6 +238,9 @@ public class AppSchedulingInfo {
insertIncreaseRequest(r);
}
return resourceUpdated;
} finally {
this.writeLock.unlock();
}
}
/**
@ -275,8 +304,10 @@ public class AppSchedulingInfo {
}
}
public synchronized boolean removeIncreaseRequest(NodeId nodeId,
public boolean removeIncreaseRequest(NodeId nodeId,
SchedulerRequestKey schedulerKey, ContainerId containerId) {
try {
this.writeLock.lock();
Map<SchedulerRequestKey, Map<ContainerId, SchedContainerChangeRequest>>
requestsOnNode = containerIncreaseRequestMap.get(nodeId);
if (null == requestsOnNode) {
@ -316,10 +347,15 @@ public class AppSchedulingInfo {
}
return true;
} finally {
this.writeLock.unlock();
}
}
public SchedContainerChangeRequest getIncreaseRequest(NodeId nodeId,
SchedulerRequestKey schedulerKey, ContainerId containerId) {
try {
this.readLock.lock();
Map<SchedulerRequestKey, Map<ContainerId, SchedContainerChangeRequest>>
requestsOnNode = containerIncreaseRequestMap.get(nodeId);
if (null == requestsOnNode) {
@ -330,6 +366,9 @@ public class AppSchedulingInfo {
requestsOnNode.get(schedulerKey);
return requestsOnNodeWithPriority == null ? null
: requestsOnNodeWithPriority.get(containerId);
} finally {
this.readLock.unlock();
}
}
/**
@ -343,12 +382,13 @@ public class AppSchedulingInfo {
* recover ResourceRequest on preemption
* @return true if any resource was updated, false otherwise
*/
public synchronized boolean updateResourceRequests(
List<ResourceRequest> requests,
public boolean updateResourceRequests(List<ResourceRequest> requests,
boolean recoverPreemptedRequestForAContainer) {
// Flag to track if any incoming requests update "ANY" requests
boolean anyResourcesUpdated = false;
try {
this.writeLock.lock();
// Update resource requests
for (ResourceRequest request : requests) {
SchedulerRequestKey schedulerKey = SchedulerRequestKey.create(request);
@ -376,7 +416,8 @@ public class AppSchedulingInfo {
if (resourceName.equals(ResourceRequest.ANY)) {
//update the applications requested labels set
requestedPartitions.add(request.getNodeLabelExpression() == null
? RMNodeLabelsManager.NO_LABEL : request.getNodeLabelExpression());
? RMNodeLabelsManager.NO_LABEL :
request.getNodeLabelExpression());
anyResourcesUpdated = true;
@ -386,6 +427,9 @@ public class AppSchedulingInfo {
}
}
return anyResourcesUpdated;
} finally {
this.writeLock.unlock();
}
}
private void updatePendingResources(ResourceRequest lastRequest,
@ -529,34 +573,49 @@ public class AppSchedulingInfo {
return userBlacklistChanged.getAndSet(false);
}
public synchronized Collection<SchedulerRequestKey> getSchedulerKeys() {
public Collection<SchedulerRequestKey> getSchedulerKeys() {
return schedulerKeys.keySet();
}
public synchronized Map<String, ResourceRequest> getResourceRequests(
public Map<String, ResourceRequest> getResourceRequests(
SchedulerRequestKey schedulerKey) {
return resourceRequestMap.get(schedulerKey);
}
public synchronized List<ResourceRequest> getAllResourceRequests() {
public List<ResourceRequest> getAllResourceRequests() {
List<ResourceRequest> ret = new ArrayList<>();
try {
this.readLock.lock();
for (Map<String, ResourceRequest> r : resourceRequestMap.values()) {
ret.addAll(r.values());
}
} finally {
this.readLock.unlock();
}
return ret;
}
public synchronized ResourceRequest getResourceRequest(
SchedulerRequestKey schedulerKey, String resourceName) {
public ResourceRequest getResourceRequest(SchedulerRequestKey schedulerKey,
String resourceName) {
try {
this.readLock.lock();
Map<String, ResourceRequest> nodeRequests =
resourceRequestMap.get(schedulerKey);
return (nodeRequests == null) ? null : nodeRequests.get(resourceName);
} finally {
this.readLock.unlock();
}
}
public synchronized Resource getResource(SchedulerRequestKey schedulerKey) {
public Resource getResource(SchedulerRequestKey schedulerKey) {
try {
this.readLock.lock();
ResourceRequest request =
getResourceRequest(schedulerKey, ResourceRequest.ANY);
return (request == null) ? null : request.getCapability();
} finally {
this.readLock.unlock();
}
}
/**
@ -582,8 +641,7 @@ public class AppSchedulingInfo {
}
}
public synchronized void increaseContainer(
SchedContainerChangeRequest increaseRequest) {
public void increaseContainer(SchedContainerChangeRequest increaseRequest) {
NodeId nodeId = increaseRequest.getNodeId();
SchedulerRequestKey schedulerKey =
increaseRequest.getRMContainer().getAllocatedSchedulerKey();
@ -596,16 +654,21 @@ public class AppSchedulingInfo {
+ increaseRequest.getNodeId() + " user=" + user + " resource="
+ deltaCapacity);
}
try {
this.writeLock.lock();
// Set queue metrics
queue.getMetrics().allocateResources(user, deltaCapacity);
// remove the increase request from pending increase request map
removeIncreaseRequest(nodeId, schedulerKey, containerId);
// update usage
appResourceUsage.incUsed(increaseRequest.getNodePartition(), deltaCapacity);
appResourceUsage.incUsed(increaseRequest.getNodePartition(),
deltaCapacity);
} finally {
this.writeLock.unlock();
}
}
public synchronized void decreaseContainer(
SchedContainerChangeRequest decreaseRequest) {
public void decreaseContainer(SchedContainerChangeRequest decreaseRequest) {
// Delta is negative when it's a decrease request
Resource absDelta = Resources.negate(decreaseRequest.getDeltaCapacity());
@ -616,11 +679,16 @@ public class AppSchedulingInfo {
+ absDelta);
}
try {
this.writeLock.lock();
// Set queue metrics
queue.getMetrics().releaseResources(user, absDelta);
// update usage
appResourceUsage.decUsed(decreaseRequest.getNodePartition(), absDelta);
} finally {
this.writeLock.unlock();
}
}
/**
@ -633,10 +701,12 @@ public class AppSchedulingInfo {
* @param containerAllocated Container Allocated
* @return List of ResourceRequests
*/
public synchronized List<ResourceRequest> allocate(NodeType type,
SchedulerNode node, SchedulerRequestKey schedulerKey,
ResourceRequest request, Container containerAllocated) {
public List<ResourceRequest> allocate(NodeType type, SchedulerNode node,
SchedulerRequestKey schedulerKey, ResourceRequest request,
Container containerAllocated) {
List<ResourceRequest> resourceRequests = new ArrayList<>();
try {
this.writeLock.lock();
if (type == NodeType.NODE_LOCAL) {
allocateNodeLocal(node, schedulerKey, request, resourceRequests);
} else if (type == NodeType.RACK_LOCAL) {
@ -663,13 +733,16 @@ public class AppSchedulingInfo {
metrics.allocateResources(user, 1, request.getCapability(), true);
metrics.incrNodeTypeAggregations(user, type);
return resourceRequests;
} finally {
this.writeLock.unlock();
}
}
/**
* The {@link ResourceScheduler} is allocating data-local resources to the
* application.
*/
private synchronized void allocateNodeLocal(SchedulerNode node,
private void allocateNodeLocal(SchedulerNode node,
SchedulerRequestKey schedulerKey, ResourceRequest nodeLocalRequest,
List<ResourceRequest> resourceRequests) {
// Update future requirements
@ -701,7 +774,7 @@ public class AppSchedulingInfo {
* The {@link ResourceScheduler} is allocating data-local resources to the
* application.
*/
private synchronized void allocateRackLocal(SchedulerNode node,
private void allocateRackLocal(SchedulerNode node,
SchedulerRequestKey schedulerKey, ResourceRequest rackLocalRequest,
List<ResourceRequest> resourceRequests) {
// Update future requirements
@ -720,8 +793,8 @@ public class AppSchedulingInfo {
* The {@link ResourceScheduler} is allocating data-local resources to the
* application.
*/
private synchronized void allocateOffSwitch(
ResourceRequest offSwitchRequest, List<ResourceRequest> resourceRequests,
private void allocateOffSwitch(ResourceRequest offSwitchRequest,
List<ResourceRequest> resourceRequests,
SchedulerRequestKey schedulerKey) {
// Update future requirements
decrementOutstanding(offSwitchRequest, schedulerKey);
@ -729,8 +802,8 @@ public class AppSchedulingInfo {
resourceRequests.add(cloneResourceRequest(offSwitchRequest));
}
private synchronized void decrementOutstanding(
ResourceRequest offSwitchRequest, SchedulerRequestKey schedulerKey) {
private void decrementOutstanding(ResourceRequest offSwitchRequest,
SchedulerRequestKey schedulerKey) {
int numOffSwitchContainers = offSwitchRequest.getNumContainers() - 1;
// Do not remove ANY
@ -749,13 +822,15 @@ public class AppSchedulingInfo {
offSwitchRequest.getCapability());
}
private synchronized void checkForDeactivation() {
private void checkForDeactivation() {
if (schedulerKeys.isEmpty()) {
activeUsersManager.deactivateApplication(user, applicationId);
}
}
public synchronized void move(Queue newQueue) {
public void move(Queue newQueue) {
try {
this.writeLock.lock();
QueueMetrics oldMetrics = queue.getMetrics();
QueueMetrics newMetrics = newQueue.getMetrics();
for (Map<String, ResourceRequest> asks : resourceRequestMap.values()) {
@ -779,10 +854,15 @@ public class AppSchedulingInfo {
activeUsersManager = newQueue.getActiveUsersManager();
activeUsersManager.activateApplication(user, applicationId);
this.queue = newQueue;
} finally {
this.writeLock.unlock();
}
}
public synchronized void stop() {
public void stop() {
// clear pending resources metrics for the application
try {
this.writeLock.lock();
QueueMetrics metrics = queue.getMetrics();
for (Map<String, ResourceRequest> asks : resourceRequestMap.values()) {
ResourceRequest request = asks.get(ResourceRequest.ANY);
@ -801,13 +881,21 @@ public class AppSchedulingInfo {
// Clear requests themselves
clearRequests();
} finally {
this.writeLock.unlock();
}
}
public synchronized void setQueue(Queue queue) {
public void setQueue(Queue queue) {
try {
this.writeLock.lock();
this.queue = queue;
} finally {
this.writeLock.unlock();
}
}
public Set<String> getBlackList() {
private Set<String> getBlackList() {
return this.placesBlacklistedByApp;
}
@ -817,14 +905,16 @@ public class AppSchedulingInfo {
}
}
public synchronized void transferStateFromPreviousAppSchedulingInfo(
public void transferStateFromPreviousAppSchedulingInfo(
AppSchedulingInfo appInfo) {
// This should not require locking the userBlacklist since it will not be
// used by this instance until after setCurrentAppAttempt.
// This should not require locking the placesBlacklistedByApp since it will
// not be used by this instance until after setCurrentAppAttempt.
this.placesBlacklistedByApp = appInfo.getBlackList();
}
public synchronized void recoverContainer(RMContainer rmContainer) {
public void recoverContainer(RMContainer rmContainer) {
try {
this.writeLock.lock();
QueueMetrics metrics = queue.getMetrics();
if (pending) {
// If there was any container to recover, the application was
@ -840,6 +930,9 @@ public class AppSchedulingInfo {
metrics.allocateResources(user, 1, rmContainer.getAllocatedResource(),
false);
} finally {
this.writeLock.unlock();
}
}
public ResourceRequest cloneResourceRequest(ResourceRequest request) {