YARN-3141. Improve locks in SchedulerApplicationAttempt/FSAppAttempt/FiCaSchedulerApp. Contributed by Wangda Tan

(cherry picked from commit b8a30f2f17)
This commit is contained in:
Jian He 2016-09-19 16:58:39 +08:00
parent dfaac56433
commit 9942ca2bf0
4 changed files with 953 additions and 738 deletions

View File

@ -26,8 +26,11 @@ import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import com.google.common.collect.ConcurrentHashMultiset;
import org.apache.commons.lang.time.DateUtils;
import org.apache.commons.lang.time.FastDateFormat;
import org.apache.commons.logging.Log;
@ -71,8 +74,6 @@ import org.apache.hadoop.yarn.util.resource.Resources;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import com.google.common.collect.HashMultiset;
import com.google.common.collect.Multiset;
/**
* Represents an application attempt from the viewpoint of the scheduler.
@ -97,14 +98,14 @@ public class SchedulerApplicationAttempt implements SchedulableEntity {
protected final AppSchedulingInfo appSchedulingInfo;
protected ApplicationAttemptId attemptId;
protected Map<ContainerId, RMContainer> liveContainers =
new HashMap<ContainerId, RMContainer>();
new ConcurrentHashMap<>();
protected final Map<SchedulerRequestKey, Map<NodeId, RMContainer>>
reservedContainers = new HashMap<>();
private final Multiset<SchedulerRequestKey> reReservations =
HashMultiset.create();
private final ConcurrentHashMultiset<SchedulerRequestKey> reReservations =
ConcurrentHashMultiset.create();
private Resource resourceLimit = Resource.newInstance(0, 0);
private volatile Resource resourceLimit = Resource.newInstance(0, 0);
private boolean unmanagedAM = true;
private boolean amRunning = false;
private LogAggregationContext logAggregationContext;
@ -138,7 +139,8 @@ public class SchedulerApplicationAttempt implements SchedulableEntity {
* the application successfully schedules a task (at rack or node local), it
* is reset to 0.
*/
Multiset<SchedulerRequestKey> schedulingOpportunities = HashMultiset.create();
private ConcurrentHashMultiset<SchedulerRequestKey> schedulingOpportunities =
ConcurrentHashMultiset.create();
/**
* Count how many times the application has been given an opportunity to
@ -147,15 +149,16 @@ public class SchedulerApplicationAttempt implements SchedulableEntity {
* incremented, and each time the application successfully schedules a task,
* it is reset to 0 when schedule any task at corresponding priority.
*/
Multiset<SchedulerRequestKey> missedNonPartitionedReqSchedulingOpportunity =
HashMultiset.create();
private ConcurrentHashMultiset<SchedulerRequestKey>
missedNonPartitionedReqSchedulingOpportunity =
ConcurrentHashMultiset.create();
// Time of the last container scheduled at the current allowed level
protected Map<SchedulerRequestKey, Long> lastScheduledContainer =
new HashMap<>();
new ConcurrentHashMap<>();
protected Queue queue;
protected boolean isStopped = false;
protected volatile Queue queue;
protected volatile boolean isStopped = false;
protected String appAMNodePartitionName = CommonNodeLabelsManager.NO_LABEL;
@ -163,6 +166,9 @@ public class SchedulerApplicationAttempt implements SchedulableEntity {
private RMAppAttempt appAttempt;
protected ReentrantReadWriteLock.ReadLock readLock;
protected ReentrantReadWriteLock.WriteLock writeLock;
public SchedulerApplicationAttempt(ApplicationAttemptId applicationAttemptId,
String user, Queue queue, ActiveUsersManager activeUsersManager,
RMContext rmContext) {
@ -188,14 +194,23 @@ public class SchedulerApplicationAttempt implements SchedulableEntity {
appSubmissionContext.getLogAggregationContext();
}
}
ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
readLock = lock.readLock();
writeLock = lock.writeLock();
}
/**
* Get the live containers of the application.
* @return live containers of the application
*/
public synchronized Collection<RMContainer> getLiveContainers() {
return new ArrayList<RMContainer>(liveContainers.values());
public Collection<RMContainer> getLiveContainers() {
try {
readLock.lock();
return new ArrayList<>(liveContainers.values());
} finally {
readLock.unlock();
}
}
public AppSchedulingInfo getAppSchedulingInfo() {
@ -243,20 +258,36 @@ public class SchedulerApplicationAttempt implements SchedulableEntity {
return appSchedulingInfo.getSchedulerKeys();
}
public synchronized ResourceRequest getResourceRequest(
public ResourceRequest getResourceRequest(
SchedulerRequestKey schedulerKey, String resourceName) {
try {
readLock.lock();
return appSchedulingInfo.getResourceRequest(schedulerKey, resourceName);
} finally {
readLock.unlock();
}
public synchronized int getTotalRequiredResources(
}
public int getTotalRequiredResources(
SchedulerRequestKey schedulerKey) {
try {
readLock.lock();
ResourceRequest request =
getResourceRequest(schedulerKey, ResourceRequest.ANY);
return request == null ? 0 : request.getNumContainers();
} finally {
readLock.unlock();
}
}
public synchronized Resource getResource(SchedulerRequestKey schedulerKey) {
public Resource getResource(SchedulerRequestKey schedulerKey) {
try {
readLock.lock();
return appSchedulingInfo.getResource(schedulerKey);
} finally {
readLock.unlock();
}
}
public String getQueueName() {
@ -291,38 +322,48 @@ public class SchedulerApplicationAttempt implements SchedulableEntity {
return unmanagedAM;
}
public synchronized RMContainer getRMContainer(ContainerId id) {
public RMContainer getRMContainer(ContainerId id) {
return liveContainers.get(id);
}
public synchronized void addRMContainer(
public void addRMContainer(
ContainerId id, RMContainer rmContainer) {
try {
writeLock.lock();
liveContainers.put(id, rmContainer);
if (rmContainer.isRemotelyAllocated()) {
this.attemptResourceUsageAllocatedRemotely.incUsed(
rmContainer.getAllocatedResource());
}
} finally {
writeLock.unlock();
}
}
public synchronized void removeRMContainer(ContainerId containerId) {
public void removeRMContainer(ContainerId containerId) {
try {
writeLock.lock();
RMContainer rmContainer = liveContainers.remove(containerId);
if (rmContainer != null && rmContainer.isRemotelyAllocated()) {
this.attemptResourceUsageAllocatedRemotely.decUsed(
rmContainer.getAllocatedResource());
}
} finally {
writeLock.unlock();
}
}
protected synchronized void resetReReservations(
protected void resetReReservations(
SchedulerRequestKey schedulerKey) {
reReservations.setCount(schedulerKey, 0);
}
protected synchronized void addReReservation(
protected void addReReservation(
SchedulerRequestKey schedulerKey) {
reReservations.add(schedulerKey);
}
public synchronized int getReReservations(SchedulerRequestKey schedulerKey) {
public int getReReservations(SchedulerRequestKey schedulerKey) {
return reReservations.count(schedulerKey);
}
@ -333,7 +374,7 @@ public class SchedulerApplicationAttempt implements SchedulableEntity {
*/
@Stable
@Private
public synchronized Resource getCurrentReservation() {
public Resource getCurrentReservation() {
return attemptResourceUsage.getReserved();
}
@ -341,28 +382,43 @@ public class SchedulerApplicationAttempt implements SchedulableEntity {
return queue;
}
public synchronized boolean updateResourceRequests(
public boolean updateResourceRequests(
List<ResourceRequest> requests) {
try {
writeLock.lock();
if (!isStopped) {
return appSchedulingInfo.updateResourceRequests(requests, false);
}
return false;
} finally {
writeLock.unlock();
}
}
public synchronized void recoverResourceRequestsForContainer(
public void recoverResourceRequestsForContainer(
List<ResourceRequest> requests) {
try {
writeLock.lock();
if (!isStopped) {
appSchedulingInfo.updateResourceRequests(requests, true);
}
} finally {
writeLock.unlock();
}
}
public synchronized void stop(RMAppAttemptState rmAppAttemptFinalState) {
public void stop(RMAppAttemptState rmAppAttemptFinalState) {
try {
writeLock.lock();
// Cleanup all scheduling information
isStopped = true;
appSchedulingInfo.stop();
} finally {
writeLock.unlock();
}
}
public synchronized boolean isStopped() {
public boolean isStopped() {
return isStopped;
}
@ -370,29 +426,40 @@ public class SchedulerApplicationAttempt implements SchedulableEntity {
* Get the list of reserved containers
* @return All of the reserved containers.
*/
public synchronized List<RMContainer> getReservedContainers() {
List<RMContainer> reservedContainers = new ArrayList<RMContainer>();
for (Map.Entry<SchedulerRequestKey, Map<NodeId, RMContainer>> e :
public List<RMContainer> getReservedContainers() {
List<RMContainer> list = new ArrayList<>();
try {
readLock.lock();
for (Entry<SchedulerRequestKey, Map<NodeId, RMContainer>> e :
this.reservedContainers.entrySet()) {
reservedContainers.addAll(e.getValue().values());
list.addAll(e.getValue().values());
}
return reservedContainers;
return list;
} finally {
readLock.unlock();
}
public synchronized boolean reserveIncreasedContainer(SchedulerNode node,
}
public boolean reserveIncreasedContainer(SchedulerNode node,
SchedulerRequestKey schedulerKey, RMContainer rmContainer,
Resource reservedResource) {
try {
writeLock.lock();
if (commonReserve(node, schedulerKey, rmContainer, reservedResource)) {
attemptResourceUsage.incReserved(node.getPartition(),
reservedResource);
attemptResourceUsage.incReserved(node.getPartition(), reservedResource);
// succeeded
return true;
}
return false;
} finally {
writeLock.unlock();
}
private synchronized boolean commonReserve(SchedulerNode node,
}
private boolean commonReserve(SchedulerNode node,
SchedulerRequestKey schedulerKey, RMContainer rmContainer,
Resource reservedResource) {
try {
@ -423,13 +490,14 @@ public class SchedulerApplicationAttempt implements SchedulableEntity {
return true;
}
public synchronized RMContainer reserve(SchedulerNode node,
public RMContainer reserve(SchedulerNode node,
SchedulerRequestKey schedulerKey, RMContainer rmContainer,
Container container) {
try {
writeLock.lock();
// Create RMContainer if necessary
if (rmContainer == null) {
rmContainer =
new RMContainerImpl(container, getApplicationAttemptId(),
rmContainer = new RMContainerImpl(container, getApplicationAttemptId(),
node.getNodeID(), appSchedulingInfo.getUser(), rmContext);
attemptResourceUsage.incReserved(node.getPartition(),
container.getResource());
@ -445,70 +513,65 @@ public class SchedulerApplicationAttempt implements SchedulableEntity {
commonReserve(node, schedulerKey, rmContainer, container.getResource());
return rmContainer;
} finally {
writeLock.unlock();
}
/**
* Has the application reserved the given <code>node</code> at the
* given <code>priority</code>?
* @param node node to be checked
* @param schedulerKey scheduler key of reserved container
* @return true is reserved, false if not
*/
public synchronized boolean isReserved(SchedulerNode node,
SchedulerRequestKey schedulerKey) {
Map<NodeId, RMContainer> reservedContainers =
this.reservedContainers.get(schedulerKey);
if (reservedContainers != null) {
return reservedContainers.containsKey(node.getNodeID());
}
return false;
}
public synchronized void setHeadroom(Resource globalLimit) {
this.resourceLimit = globalLimit;
public void setHeadroom(Resource globalLimit) {
this.resourceLimit = Resources.componentwiseMax(globalLimit,
Resources.none());
}
/**
* Get available headroom in terms of resources for the application's user.
* @return available resource headroom
*/
public synchronized Resource getHeadroom() {
// Corner case to deal with applications being slightly over-limit
if (resourceLimit.getMemorySize() < 0) {
resourceLimit.setMemorySize(0);
}
public Resource getHeadroom() {
return resourceLimit;
}
public synchronized int getNumReservedContainers(
public int getNumReservedContainers(
SchedulerRequestKey schedulerKey) {
Map<NodeId, RMContainer> reservedContainers =
this.reservedContainers.get(schedulerKey);
return (reservedContainers == null) ? 0 : reservedContainers.size();
try {
readLock.lock();
Map<NodeId, RMContainer> map = this.reservedContainers.get(
schedulerKey);
return (map == null) ? 0 : map.size();
} finally {
readLock.unlock();
}
}
@SuppressWarnings("unchecked")
public synchronized void containerLaunchedOnNode(ContainerId containerId,
public void containerLaunchedOnNode(ContainerId containerId,
NodeId nodeId) {
try {
writeLock.lock();
// Inform the container
RMContainer rmContainer = getRMContainer(containerId);
if (rmContainer == null) {
// Some unknown container sneaked into the system. Kill it.
rmContext.getDispatcher().getEventHandler()
.handle(new RMNodeCleanContainerEvent(nodeId, containerId));
rmContext.getDispatcher().getEventHandler().handle(
new RMNodeCleanContainerEvent(nodeId, containerId));
return;
}
rmContainer.handle(new RMContainerEvent(containerId,
RMContainerEventType.LAUNCHED));
rmContainer.handle(
new RMContainerEvent(containerId, RMContainerEventType.LAUNCHED));
} finally {
writeLock.unlock();
}
}
public synchronized void showRequests() {
public void showRequests() {
if (LOG.isDebugEnabled()) {
try {
readLock.lock();
for (SchedulerRequestKey schedulerKey : getSchedulerKeys()) {
Map<String, ResourceRequest> requests =
getResourceRequests(schedulerKey);
Map<String, ResourceRequest> requests = getResourceRequests(
schedulerKey);
if (requests != null) {
LOG.debug("showRequests:" + " application=" + getApplicationId()
+ " headRoom=" + getHeadroom() + " currentConsumption="
@ -519,6 +582,9 @@ public class SchedulerApplicationAttempt implements SchedulableEntity {
}
}
}
} finally {
readLock.unlock();
}
}
}
@ -572,54 +638,75 @@ public class SchedulerApplicationAttempt implements SchedulableEntity {
// Create container token and update NMToken altogether, if either of them fails for
// some reason like DNS unavailable, do not return this container and keep it
// in the newlyAllocatedContainers waiting to be refetched.
public synchronized List<Container> pullNewlyAllocatedContainers() {
List<Container> returnContainerList =
new ArrayList<Container>(newlyAllocatedContainers.size());
for (Iterator<RMContainer> i = newlyAllocatedContainers.iterator(); i
.hasNext();) {
public List<Container> pullNewlyAllocatedContainers() {
try {
writeLock.lock();
List<Container> returnContainerList = new ArrayList<Container>(
newlyAllocatedContainers.size());
Iterator<RMContainer> i = newlyAllocatedContainers.iterator();
while (i.hasNext()) {
RMContainer rmContainer = i.next();
Container updatedContainer =
updateContainerAndNMToken(rmContainer, true, false);
// Only add container to return list when it's not null. updatedContainer
// could be null when generate token failed, it can be caused by DNS
// resolving failed.
Container updatedContainer = updateContainerAndNMToken(rmContainer,
true, false);
// Only add container to return list when it's not null.
// updatedContainer could be null when generate token failed, it can be
// caused by DNS resolving failed.
if (updatedContainer != null) {
returnContainerList.add(updatedContainer);
i.remove();
}
}
return returnContainerList;
} finally {
writeLock.unlock();
}
private synchronized List<Container> pullNewlyUpdatedContainers(
}
private List<Container> pullNewlyUpdatedContainers(
Map<ContainerId, RMContainer> updatedContainerMap, boolean increase) {
List<Container> returnContainerList =
new ArrayList<Container>(updatedContainerMap.size());
for (Iterator<Entry<ContainerId, RMContainer>> i =
updatedContainerMap.entrySet().iterator(); i.hasNext();) {
try {
writeLock.lock();
List <Container> returnContainerList = new ArrayList <Container>(
updatedContainerMap.size());
Iterator<Entry<ContainerId, RMContainer>> i =
updatedContainerMap.entrySet().iterator();
while (i.hasNext()) {
RMContainer rmContainer = i.next().getValue();
Container updatedContainer =
updateContainerAndNMToken(rmContainer, false, increase);
Container updatedContainer = updateContainerAndNMToken(rmContainer,
false, increase);
if (updatedContainer != null) {
returnContainerList.add(updatedContainer);
i.remove();
}
}
return returnContainerList;
} finally {
writeLock.unlock();
}
public synchronized List<Container> pullNewlyIncreasedContainers() {
}
public List<Container> pullNewlyIncreasedContainers() {
return pullNewlyUpdatedContainers(newlyIncreasedContainers, true);
}
public synchronized List<Container> pullNewlyDecreasedContainers() {
public List<Container> pullNewlyDecreasedContainers() {
return pullNewlyUpdatedContainers(newlyDecreasedContainers, false);
}
public synchronized List<NMToken> pullUpdatedNMTokens() {
List<NMToken> returnList = new ArrayList<NMToken>(updatedNMTokens);
public List<NMToken> pullUpdatedNMTokens() {
try {
writeLock.lock();
List <NMToken> returnList = new ArrayList<>(updatedNMTokens);
updatedNMTokens.clear();
return returnList;
} finally {
writeLock.unlock();
}
}
public boolean isWaitingForAMContainer() {
@ -628,53 +715,63 @@ public class SchedulerApplicationAttempt implements SchedulableEntity {
return (!unmanagedAM && appAttempt.getMasterContainer() == null);
}
public synchronized void updateBlacklist(List<String> blacklistAdditions,
public void updateBlacklist(List<String> blacklistAdditions,
List<String> blacklistRemovals) {
try {
writeLock.lock();
if (!isStopped) {
if (isWaitingForAMContainer()) {
// The request is for the AM-container, and the AM-container is launched
// by the system. So, update the places that are blacklisted by system
// (as opposed to those blacklisted by the application).
// The request is for the AM-container, and the AM-container is
// launched by the system. So, update the places that are blacklisted
// by system (as opposed to those blacklisted by the application).
this.appSchedulingInfo.updatePlacesBlacklistedBySystem(
blacklistAdditions, blacklistRemovals);
} else{
this.appSchedulingInfo.updatePlacesBlacklistedByApp(blacklistAdditions,
blacklistRemovals);
this.appSchedulingInfo.updatePlacesBlacklistedByApp(
blacklistAdditions, blacklistRemovals);
}
}
} finally {
writeLock.unlock();
}
}
public boolean isPlaceBlacklisted(String resourceName) {
try {
readLock.lock();
boolean forAMContainer = isWaitingForAMContainer();
return this.appSchedulingInfo.isPlaceBlacklisted(resourceName,
forAMContainer);
} finally {
readLock.unlock();
}
}
public synchronized int addMissedNonPartitionedRequestSchedulingOpportunity(
public int addMissedNonPartitionedRequestSchedulingOpportunity(
SchedulerRequestKey schedulerKey) {
missedNonPartitionedReqSchedulingOpportunity.add(schedulerKey);
return missedNonPartitionedReqSchedulingOpportunity.count(schedulerKey);
return missedNonPartitionedReqSchedulingOpportunity.add(
schedulerKey, 1) + 1;
}
public synchronized void
public void
resetMissedNonPartitionedRequestSchedulingOpportunity(
SchedulerRequestKey schedulerKey) {
missedNonPartitionedReqSchedulingOpportunity.setCount(schedulerKey, 0);
}
public synchronized void addSchedulingOpportunity(
public void addSchedulingOpportunity(
SchedulerRequestKey schedulerKey) {
int count = schedulingOpportunities.count(schedulerKey);
if (count < Integer.MAX_VALUE) {
schedulingOpportunities.setCount(schedulerKey, count + 1);
try {
schedulingOpportunities.add(schedulerKey, 1);
} catch (IllegalArgumentException e) {
// This happens when count = MAX_INT, ignore the exception
}
}
public synchronized void subtractSchedulingOpportunity(
public void subtractSchedulingOpportunity(
SchedulerRequestKey schedulerKey) {
int count = schedulingOpportunities.count(schedulerKey) - 1;
this.schedulingOpportunities.setCount(schedulerKey, Math.max(count, 0));
this.schedulingOpportunities.removeExactly(schedulerKey, 1);
}
/**
@ -684,7 +781,7 @@ public class SchedulerApplicationAttempt implements SchedulableEntity {
* @param schedulerKey Scheduler Key
* @return number of scheduling opportunities
*/
public synchronized int getSchedulingOpportunities(
public int getSchedulingOpportunities(
SchedulerRequestKey schedulerKey) {
return schedulingOpportunities.count(schedulerKey);
}
@ -696,16 +793,22 @@ public class SchedulerApplicationAttempt implements SchedulableEntity {
*
* @param schedulerKey The priority of the container scheduled.
*/
public synchronized void resetSchedulingOpportunities(
public void resetSchedulingOpportunities(
SchedulerRequestKey schedulerKey) {
resetSchedulingOpportunities(schedulerKey, System.currentTimeMillis());
}
// used for continuous scheduling
public synchronized void resetSchedulingOpportunities(
public void resetSchedulingOpportunities(
SchedulerRequestKey schedulerKey, long currentTimeMs) {
try {
writeLock.lock();
lastScheduledContainer.put(schedulerKey, currentTimeMs);
schedulingOpportunities.setCount(schedulerKey, 0);
} finally {
writeLock.unlock();
}
}
@VisibleForTesting
@ -713,7 +816,7 @@ public class SchedulerApplicationAttempt implements SchedulableEntity {
schedulingOpportunities.setCount(schedulerKey, count);
}
synchronized AggregateAppResourceUsage getRunningAggregateAppResourceUsage() {
private AggregateAppResourceUsage getRunningAggregateAppResourceUsage() {
long currentTimeMillis = System.currentTimeMillis();
// Don't walk the whole container list if the resources were computed
// recently.
@ -737,22 +840,26 @@ public class SchedulerApplicationAttempt implements SchedulableEntity {
return new AggregateAppResourceUsage(lastMemorySeconds, lastVcoreSeconds);
}
public synchronized ApplicationResourceUsageReport getResourceUsageReport() {
public ApplicationResourceUsageReport getResourceUsageReport() {
try {
writeLock.lock();
AggregateAppResourceUsage runningResourceUsage =
getRunningAggregateAppResourceUsage();
Resource usedResourceClone =
Resources.clone(attemptResourceUsage.getAllUsed());
Resource reservedResourceClone =
Resources.clone(attemptResourceUsage.getReserved());
Resource usedResourceClone = Resources.clone(
attemptResourceUsage.getAllUsed());
Resource reservedResourceClone = Resources.clone(
attemptResourceUsage.getReserved());
Resource cluster = rmContext.getScheduler().getClusterResource();
ResourceCalculator calc = rmContext.getScheduler().getResourceCalculator();
ResourceCalculator calc =
rmContext.getScheduler().getResourceCalculator();
float queueUsagePerc = 0.0f;
float clusterUsagePerc = 0.0f;
if (!calc.isInvalidDivisor(cluster)) {
queueUsagePerc =
calc.divide(cluster, usedResourceClone, Resources.multiply(cluster,
queue.getQueueInfo(false, false).getCapacity())) * 100;
clusterUsagePerc = calc.divide(cluster, usedResourceClone, cluster) * 100;
queueUsagePerc = calc.divide(cluster, usedResourceClone, Resources
.multiply(cluster, queue.getQueueInfo(false, false).getCapacity()))
* 100;
clusterUsagePerc = calc.divide(cluster, usedResourceClone, cluster)
* 100;
}
return ApplicationResourceUsageReport.newInstance(liveContainers.size(),
reservedContainers.size(), usedResourceClone, reservedResourceClone,
@ -760,36 +867,43 @@ public class SchedulerApplicationAttempt implements SchedulableEntity {
runningResourceUsage.getMemorySeconds(),
runningResourceUsage.getVcoreSeconds(), queueUsagePerc,
clusterUsagePerc);
} finally {
writeLock.unlock();
}
}
public synchronized Map<ContainerId, RMContainer> getLiveContainersMap() {
@VisibleForTesting
public Map<ContainerId, RMContainer> getLiveContainersMap() {
return this.liveContainers;
}
public synchronized Resource getResourceLimit() {
return this.resourceLimit;
}
public synchronized Map<SchedulerRequestKey, Long>
public Map<SchedulerRequestKey, Long>
getLastScheduledContainer() {
return this.lastScheduledContainer;
}
public synchronized void transferStateFromPreviousAttempt(
public void transferStateFromPreviousAttempt(
SchedulerApplicationAttempt appAttempt) {
try {
writeLock.lock();
this.liveContainers = appAttempt.getLiveContainersMap();
// this.reReservations = appAttempt.reReservations;
this.attemptResourceUsage.copyAllUsed(appAttempt.attemptResourceUsage);
this.resourceLimit = appAttempt.getResourceLimit();
this.setHeadroom(appAttempt.resourceLimit);
// this.currentReservation = appAttempt.currentReservation;
// this.newlyAllocatedContainers = appAttempt.newlyAllocatedContainers;
// this.schedulingOpportunities = appAttempt.schedulingOpportunities;
this.lastScheduledContainer = appAttempt.getLastScheduledContainer();
this.appSchedulingInfo
.transferStateFromPreviousAppSchedulingInfo(appAttempt.appSchedulingInfo);
this.appSchedulingInfo.transferStateFromPreviousAppSchedulingInfo(
appAttempt.appSchedulingInfo);
} finally {
writeLock.unlock();
}
}
public synchronized void move(Queue newQueue) {
public void move(Queue newQueue) {
try {
writeLock.lock();
QueueMetrics oldMetrics = queue.getMetrics();
QueueMetrics newMetrics = newQueue.getMetrics();
String newQueueName = newQueue.getQueueName();
@ -811,10 +925,15 @@ public class SchedulerApplicationAttempt implements SchedulableEntity {
appSchedulingInfo.move(newQueue);
this.queue = newQueue;
} finally {
writeLock.unlock();
}
}
public synchronized void recoverContainer(SchedulerNode node,
public void recoverContainer(SchedulerNode node,
RMContainer rmContainer) {
try {
writeLock.lock();
// recover app scheduling info
appSchedulingInfo.recoverContainer(rmContainer);
@ -824,14 +943,17 @@ public class SchedulerApplicationAttempt implements SchedulableEntity {
LOG.info("SchedulerAttempt " + getApplicationAttemptId()
+ " is recovering container " + rmContainer.getContainerId());
liveContainers.put(rmContainer.getContainerId(), rmContainer);
attemptResourceUsage.incUsed(node.getPartition(), rmContainer
.getContainer().getResource());
attemptResourceUsage.incUsed(node.getPartition(),
rmContainer.getContainer().getResource());
// resourceLimit: updated when LeafQueue#recoverContainer#allocateResource
// is called.
// newlyAllocatedContainers.add(rmContainer);
// schedulingOpportunities
// lastScheduledContainer
} finally {
writeLock.unlock();
}
}
public void incNumAllocatedContainers(NodeType containerType,
@ -915,19 +1037,31 @@ public class SchedulerApplicationAttempt implements SchedulableEntity {
return attemptResourceUsage;
}
public synchronized boolean removeIncreaseRequest(NodeId nodeId,
public boolean removeIncreaseRequest(NodeId nodeId,
SchedulerRequestKey schedulerKey, ContainerId containerId) {
try {
writeLock.lock();
return appSchedulingInfo.removeIncreaseRequest(nodeId, schedulerKey,
containerId);
} finally {
writeLock.unlock();
}
}
public synchronized boolean updateIncreaseRequests(
public boolean updateIncreaseRequests(
List<SchedContainerChangeRequest> increaseRequests) {
try {
writeLock.lock();
return appSchedulingInfo.updateIncreaseRequests(increaseRequests);
} finally {
writeLock.unlock();
}
}
private synchronized void changeContainerResource(
private void changeContainerResource(
SchedContainerChangeRequest changeRequest, boolean increase) {
try {
writeLock.lock();
if (increase) {
appSchedulingInfo.increaseContainer(changeRequest);
} else{
@ -939,8 +1073,8 @@ public class SchedulerApplicationAttempt implements SchedulableEntity {
new RMContainerChangeResourceEvent(changeRequest.getContainerId(),
changeRequest.getTargetCapacity(), increase));
// remove pending and not pulled by AM newly-increased/decreased-containers
// and add the new one
// remove pending and not pulled by AM newly-increased or
// decreased-containers and add the new one
if (increase) {
newlyDecreasedContainers.remove(changeRequest.getContainerId());
newlyIncreasedContainers.put(changeRequest.getContainerId(),
@ -950,14 +1084,17 @@ public class SchedulerApplicationAttempt implements SchedulableEntity {
newlyDecreasedContainers.put(changeRequest.getContainerId(),
changedRMContainer);
}
} finally {
writeLock.unlock();
}
}
public synchronized void decreaseContainer(
public void decreaseContainer(
SchedContainerChangeRequest decreaseRequest) {
changeContainerResource(decreaseRequest, false);
}
public synchronized void increaseContainer(
public void increaseContainer(
SchedContainerChangeRequest increaseRequest) {
changeContainerResource(increaseRequest, true);
}
@ -1025,7 +1162,10 @@ public class SchedulerApplicationAttempt implements SchedulableEntity {
this.isAttemptRecovering = isRecovering;
}
public static enum AMState {
/**
* Different state for Application Master, user can see this state from web UI
*/
public enum AMState {
UNMANAGED("User launched the Application Master, since it's unmanaged. "),
INACTIVATED("Application is added to the scheduler and is not yet activated. "),
ACTIVATED("Application is Activated, waiting for resources to be assigned for AM. "),

View File

@ -251,7 +251,7 @@ public class RegularContainerAllocator extends AbstractContainerAllocator {
return result;
}
public synchronized float getLocalityWaitFactor(
public float getLocalityWaitFactor(
SchedulerRequestKey schedulerKey, int clusterNodes) {
// Estimate: Required unique resources (i.e. hosts + racks)
int requiredResources =

View File

@ -58,7 +58,6 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CSAMCont
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CSAssignment;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityHeadroomProvider;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerContext;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.LeafQueue;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.QueueCapacities;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.SchedulingMode;
@ -98,7 +97,6 @@ public class FiCaSchedulerApp extends SchedulerApplicationAttempt {
* to hold the message if its app doesn't not get container from a node
*/
private String appSkipNodeDiagnostics;
private CapacitySchedulerContext capacitySchedulerContext;
public FiCaSchedulerApp(ApplicationAttemptId applicationAttemptId,
String user, Queue queue, ActiveUsersManager activeUsersManager,
@ -152,15 +150,13 @@ public class FiCaSchedulerApp extends SchedulerApplicationAttempt {
containerAllocator = new ContainerAllocator(this, rc, rmContext,
activitiesManager);
if (scheduler instanceof CapacityScheduler) {
capacitySchedulerContext = (CapacitySchedulerContext) scheduler;
}
}
public synchronized boolean containerCompleted(RMContainer rmContainer,
public boolean containerCompleted(RMContainer rmContainer,
ContainerStatus containerStatus, RMContainerEventType event,
String partition) {
try {
writeLock.lock();
ContainerId containerId = rmContainer.getContainerId();
// Remove from the list of containers
@ -178,9 +174,8 @@ public class FiCaSchedulerApp extends SchedulerApplicationAttempt {
containersToPreempt.remove(containerId);
Resource containerResource = rmContainer.getContainer().getResource();
RMAuditLogger.logSuccess(getUser(),
AuditConstants.RELEASE_CONTAINER, "SchedulerApp",
getApplicationId(), containerId, containerResource);
RMAuditLogger.logSuccess(getUser(), AuditConstants.RELEASE_CONTAINER,
"SchedulerApp", getApplicationId(), containerId, containerResource);
// Update usage metrics
queue.getMetrics().releaseResources(getUser(), 1, containerResource);
@ -190,11 +185,16 @@ public class FiCaSchedulerApp extends SchedulerApplicationAttempt {
lastMemoryAggregateAllocationUpdateTime = -1;
return true;
} finally {
writeLock.unlock();
}
}
public synchronized RMContainer allocate(NodeType type, FiCaSchedulerNode node,
public RMContainer allocate(NodeType type, FiCaSchedulerNode node,
SchedulerRequestKey schedulerKey, ResourceRequest request,
Container container) {
try {
writeLock.lock();
if (isStopped) {
return null;
@ -207,9 +207,9 @@ public class FiCaSchedulerApp extends SchedulerApplicationAttempt {
}
// Create RMContainer
RMContainer rmContainer =
new RMContainerImpl(container, this.getApplicationAttemptId(),
node.getNodeID(), appSchedulingInfo.getUser(), this.rmContext,
RMContainer rmContainer = new RMContainerImpl(container,
this.getApplicationAttemptId(), node.getNodeID(),
appSchedulingInfo.getUser(), this.rmContext,
request.getNodeLabelExpression());
((RMContainerImpl) rmContainer).setQueueName(this.getQueueName());
@ -225,7 +225,8 @@ public class FiCaSchedulerApp extends SchedulerApplicationAttempt {
List<ResourceRequest> resourceRequestList = appSchedulingInfo.allocate(
type, node, schedulerKey, request, container);
attemptResourceUsage.incUsed(node.getPartition(), container.getResource());
attemptResourceUsage.incUsed(node.getPartition(),
container.getResource());
// Update resource requests related to "request" and store in RMContainer
((RMContainerImpl) rmContainer).setResourceRequests(resourceRequestList);
@ -235,20 +236,24 @@ public class FiCaSchedulerApp extends SchedulerApplicationAttempt {
new RMContainerEvent(containerId, RMContainerEventType.START));
if (LOG.isDebugEnabled()) {
LOG.debug("allocate: applicationAttemptId="
+ containerId.getApplicationAttemptId()
+ " container=" + containerId + " host="
LOG.debug("allocate: applicationAttemptId=" + containerId
.getApplicationAttemptId() + " container=" + containerId + " host="
+ container.getNodeId().getHost() + " type=" + type);
}
RMAuditLogger.logSuccess(getUser(),
AuditConstants.ALLOC_CONTAINER, "SchedulerApp",
getApplicationId(), containerId, container.getResource());
RMAuditLogger.logSuccess(getUser(), AuditConstants.ALLOC_CONTAINER,
"SchedulerApp", getApplicationId(), containerId,
container.getResource());
return rmContainer;
} finally {
writeLock.unlock();
}
}
public synchronized boolean unreserve(SchedulerRequestKey schedulerKey,
public boolean unreserve(SchedulerRequestKey schedulerKey,
FiCaSchedulerNode node, RMContainer rmContainer) {
try {
writeLock.lock();
// Cancel increase request (if it has reserved increase request
rmContainer.cancelIncreaseReservation();
@ -264,6 +269,9 @@ public class FiCaSchedulerApp extends SchedulerApplicationAttempt {
return true;
}
return false;
} finally {
writeLock.unlock();
}
}
private boolean internalUnreserve(FiCaSchedulerNode node,
@ -302,34 +310,16 @@ public class FiCaSchedulerApp extends SchedulerApplicationAttempt {
return false;
}
public synchronized float getLocalityWaitFactor(
SchedulerRequestKey schedulerKey, int clusterNodes) {
// Estimate: Required unique resources (i.e. hosts + racks)
int requiredResources =
Math.max(this.getResourceRequests(schedulerKey).size() - 1, 0);
// waitFactor can't be more than '1'
// i.e. no point skipping more than clustersize opportunities
return Math.min(((float)requiredResources / clusterNodes), 1.0f);
}
public synchronized Resource getTotalPendingRequests() {
Resource ret = Resource.newInstance(0, 0);
for (ResourceRequest rr : appSchedulingInfo.getAllResourceRequests()) {
// to avoid double counting we count only "ANY" resource requests
if (ResourceRequest.isAnyLocation(rr.getResourceName())){
Resources.addTo(ret,
Resources.multiply(rr.getCapability(), rr.getNumContainers()));
}
}
return ret;
}
public synchronized void markContainerForPreemption(ContainerId cont) {
public void markContainerForPreemption(ContainerId cont) {
try {
writeLock.lock();
// ignore already completed containers
if (liveContainers.containsKey(cont)) {
containersToPreempt.add(cont);
}
} finally {
writeLock.unlock();
}
}
/**
@ -342,22 +332,22 @@ public class FiCaSchedulerApp extends SchedulerApplicationAttempt {
* @param minimumAllocation
* @return an allocation
*/
public synchronized Allocation getAllocation(ResourceCalculator rc,
public Allocation getAllocation(ResourceCalculator resourceCalculator,
Resource clusterResource, Resource minimumAllocation) {
try {
writeLock.lock();
Set<ContainerId> currentContPreemption = Collections.unmodifiableSet(
new HashSet<ContainerId>(containersToPreempt));
containersToPreempt.clear();
Resource tot = Resource.newInstance(0, 0);
for (ContainerId c : currentContPreemption) {
Resources.addTo(tot,
liveContainers.get(c).getContainer().getResource());
Resources.addTo(tot, liveContainers.get(c).getContainer()
.getResource());
}
int numCont = (int) Math.ceil(
Resources.divide(rc, clusterResource, tot, minimumAllocation));
ResourceRequest rr = ResourceRequest.newInstance(
Priority.UNDEFINED, ResourceRequest.ANY,
minimumAllocation, numCont);
ResourceRequest rr = ResourceRequest.newInstance(Priority.UNDEFINED,
ResourceRequest.ANY, minimumAllocation, numCont);
List<Container> newlyAllocatedContainers = pullNewlyAllocatedContainers();
List<Container> newlyIncreasedContainers = pullNewlyIncreasedContainers();
List<Container> newlyDecreasedContainers = pullNewlyDecreasedContainers();
@ -367,19 +357,25 @@ public class FiCaSchedulerApp extends SchedulerApplicationAttempt {
return new Allocation(newlyAllocatedContainers, headroom, null,
currentContPreemption, Collections.singletonList(rr), updatedNMTokens,
newlyIncreasedContainers, newlyDecreasedContainers);
} finally {
writeLock.unlock();
}
}
synchronized public NodeId getNodeIdToUnreserve(
@VisibleForTesting
public NodeId getNodeIdToUnreserve(
SchedulerRequestKey schedulerKey, Resource resourceNeedUnreserve,
ResourceCalculator rc, Resource clusterResource) {
try {
writeLock.lock();
// first go around make this algorithm simple and just grab first
// reservation that has enough resources
Map<NodeId, RMContainer> reservedContainers = this.reservedContainers
.get(schedulerKey);
Map<NodeId, RMContainer> reservedContainers = this.reservedContainers.get(
schedulerKey);
if ((reservedContainers != null) && (!reservedContainers.isEmpty())) {
for (Map.Entry<NodeId, RMContainer> entry : reservedContainers.entrySet()) {
for (Map.Entry<NodeId, RMContainer> entry : reservedContainers
.entrySet()) {
NodeId nodeId = entry.getKey();
RMContainer reservedContainer = entry.getValue();
if (reservedContainer.hasIncreaseReservation()) {
@ -396,40 +392,55 @@ public class FiCaSchedulerApp extends SchedulerApplicationAttempt {
if (Resources.fitsIn(rc, clusterResource, resourceNeedUnreserve,
reservedResource)) {
if (LOG.isDebugEnabled()) {
LOG.debug("unreserving node with reservation size: "
+ reservedResource
+ " in order to allocate container with size: " + resourceNeedUnreserve);
LOG.debug(
"unreserving node with reservation size: " + reservedResource
+ " in order to allocate container with size: "
+ resourceNeedUnreserve);
}
return nodeId;
}
}
}
return null;
} finally {
writeLock.unlock();
}
}
public synchronized void setHeadroomProvider(
public void setHeadroomProvider(
CapacityHeadroomProvider headroomProvider) {
try {
writeLock.lock();
this.headroomProvider = headroomProvider;
} finally {
writeLock.unlock();
}
public synchronized CapacityHeadroomProvider getHeadroomProvider() {
return headroomProvider;
}
@Override
public synchronized Resource getHeadroom() {
public Resource getHeadroom() {
try {
readLock.lock();
if (headroomProvider != null) {
return headroomProvider.getHeadroom();
}
return super.getHeadroom();
} finally {
readLock.unlock();
}
}
@Override
public synchronized void transferStateFromPreviousAttempt(
public void transferStateFromPreviousAttempt(
SchedulerApplicationAttempt appAttempt) {
try {
writeLock.lock();
super.transferStateFromPreviousAttempt(appAttempt);
this.headroomProvider =
((FiCaSchedulerApp) appAttempt).getHeadroomProvider();
this.headroomProvider = ((FiCaSchedulerApp) appAttempt).headroomProvider;
} finally {
writeLock.unlock();
}
}
public boolean reserveIncreasedContainer(SchedulerRequestKey schedulerKey,
@ -514,9 +525,12 @@ public class FiCaSchedulerApp extends SchedulerApplicationAttempt {
showRequests();
}
synchronized (this) {
try {
writeLock.lock();
return containerAllocator.assignContainers(clusterResource, node,
schedulingMode, currentResourceLimits, reservedContainer);
} finally {
writeLock.unlock();
}
}
@ -624,23 +638,33 @@ public class FiCaSchedulerApp extends SchedulerApplicationAttempt {
* Capacity Scheduler.
*/
@Override
public synchronized ApplicationResourceUsageReport getResourceUsageReport() {
public ApplicationResourceUsageReport getResourceUsageReport() {
try {
// Use write lock here because
// SchedulerApplicationAttempt#getResourceUsageReport updated fields
// TODO: improve this
writeLock.lock();
ApplicationResourceUsageReport report = super.getResourceUsageReport();
Resource cluster = rmContext.getScheduler().getClusterResource();
Resource totalPartitionRes =
rmContext.getNodeLabelManager()
.getResourceByLabel(getAppAMNodePartitionName(), cluster);
ResourceCalculator calc = rmContext.getScheduler().getResourceCalculator();
rmContext.getNodeLabelManager().getResourceByLabel(
getAppAMNodePartitionName(), cluster);
ResourceCalculator calc =
rmContext.getScheduler().getResourceCalculator();
if (!calc.isInvalidDivisor(totalPartitionRes)) {
float queueAbsMaxCapPerPartition =
((AbstractCSQueue) getQueue()).getQueueCapacities()
.getAbsoluteCapacity(getAppAMNodePartitionName());
float queueUsagePerc =
calc.divide(totalPartitionRes, report.getUsedResources(),
Resources.multiply(totalPartitionRes,
queueAbsMaxCapPerPartition)) * 100;
float queueUsagePerc = calc.divide(totalPartitionRes,
report.getUsedResources(),
Resources.multiply(totalPartitionRes, queueAbsMaxCapPerPartition))
* 100;
report.setQueueUsagePercentage(queueUsagePerc);
}
return report;
} finally {
writeLock.unlock();
}
}
}

View File

@ -123,9 +123,10 @@ public class FSAppAttempt extends SchedulerApplicationAttempt
return queue.getMetrics();
}
synchronized public void containerCompleted(RMContainer rmContainer,
public void containerCompleted(RMContainer rmContainer,
ContainerStatus containerStatus, RMContainerEventType event) {
try {
writeLock.lock();
Container container = rmContainer.getContainer();
ContainerId containerId = container.getId();
@ -134,23 +135,18 @@ public class FSAppAttempt extends SchedulerApplicationAttempt
// Inform the container
rmContainer.handle(
new RMContainerFinishedEvent(
containerId,
containerStatus,
event)
);
new RMContainerFinishedEvent(containerId, containerStatus, event));
if (LOG.isDebugEnabled()) {
LOG.debug("Completed container: " + rmContainer.getContainerId() +
" in state: " + rmContainer.getState() + " event:" + event);
LOG.debug("Completed container: " + rmContainer.getContainerId()
+ " in state: " + rmContainer.getState() + " event:" + event);
}
// Remove from the list of containers
liveContainers.remove(rmContainer.getContainerId());
Resource containerResource = rmContainer.getContainer().getResource();
RMAuditLogger.logSuccess(getUser(),
AuditConstants.RELEASE_CONTAINER, "SchedulerApp",
getApplicationId(), containerId, containerResource);
RMAuditLogger.logSuccess(getUser(), AuditConstants.RELEASE_CONTAINER,
"SchedulerApp", getApplicationId(), containerId, containerResource);
// Update usage metrics
queue.getMetrics().releaseResources(getUser(), 1, containerResource);
@ -161,13 +157,19 @@ public class FSAppAttempt extends SchedulerApplicationAttempt
// Clear resource utilization metrics cache.
lastMemoryAggregateAllocationUpdateTime = -1;
} finally {
writeLock.unlock();
}
}
private synchronized void unreserveInternal(
private void unreserveInternal(
SchedulerRequestKey schedulerKey, FSSchedulerNode node) {
Map<NodeId, RMContainer> reservedContainers =
this.reservedContainers.get(schedulerKey);
RMContainer reservedContainer = reservedContainers.remove(node.getNodeID());
try {
writeLock.lock();
Map<NodeId, RMContainer> reservedContainers = this.reservedContainers.get(
schedulerKey);
RMContainer reservedContainer = reservedContainers.remove(
node.getNodeID());
if (reservedContainers.isEmpty()) {
this.reservedContainers.remove(schedulerKey);
}
@ -178,10 +180,15 @@ public class FSAppAttempt extends SchedulerApplicationAttempt
Resource resource = reservedContainer.getContainer().getResource();
this.attemptResourceUsage.decReserved(resource);
LOG.info("Application " + getApplicationId() + " unreserved " + " on node "
LOG.info(
"Application " + getApplicationId() + " unreserved " + " on node "
+ node + ", currently has " + reservedContainers.size()
+ " at priority " + schedulerKey.getPriority() + "; currentReservation "
+ this.attemptResourceUsage.getReserved());
+ " at priority " + schedulerKey.getPriority()
+ "; currentReservation " + this.attemptResourceUsage
.getReserved());
} finally {
writeLock.unlock();
}
}
private void subtractResourcesOnBlacklistedNodes(
@ -239,17 +246,6 @@ public class FSAppAttempt extends SchedulerApplicationAttempt
return headroom;
}
public synchronized float getLocalityWaitFactor(
SchedulerRequestKey schedulerKey, int clusterNodes) {
// Estimate: Required unique resources (i.e. hosts + racks)
int requiredResources =
Math.max(this.getResourceRequests(schedulerKey).size() - 1, 0);
// waitFactor can't be more than '1'
// i.e. no point skipping more than clustersize opportunities
return Math.min(((float)requiredResources / clusterNodes), 1.0f);
}
/**
* Return the level at which we are allowed to schedule containers, given the
* current size of the cluster and thresholds indicating how many nodes to
@ -261,18 +257,25 @@ public class FSAppAttempt extends SchedulerApplicationAttempt
* @param rackLocalityThreshold rackLocalityThreshold
* @return NodeType
*/
public synchronized NodeType getAllowedLocalityLevel(
NodeType getAllowedLocalityLevel(
SchedulerRequestKey schedulerKey, int numNodes,
double nodeLocalityThreshold, double rackLocalityThreshold) {
// upper limit on threshold
if (nodeLocalityThreshold > 1.0) { nodeLocalityThreshold = 1.0; }
if (rackLocalityThreshold > 1.0) { rackLocalityThreshold = 1.0; }
if (nodeLocalityThreshold > 1.0) {
nodeLocalityThreshold = 1.0;
}
if (rackLocalityThreshold > 1.0) {
rackLocalityThreshold = 1.0;
}
// If delay scheduling is not being used, can schedule anywhere
if (nodeLocalityThreshold < 0.0 || rackLocalityThreshold < 0.0) {
return NodeType.OFF_SWITCH;
}
try {
writeLock.lock();
// Default level is NODE_LOCAL
if (!allowedLocalityLevel.containsKey(schedulerKey)) {
allowedLocalityLevel.put(schedulerKey, NodeType.NODE_LOCAL);
@ -282,9 +285,12 @@ public class FSAppAttempt extends SchedulerApplicationAttempt
NodeType allowed = allowedLocalityLevel.get(schedulerKey);
// If level is already most liberal, we're done
if (allowed.equals(NodeType.OFF_SWITCH)) return NodeType.OFF_SWITCH;
if (allowed.equals(NodeType.OFF_SWITCH)) {
return NodeType.OFF_SWITCH;
}
double threshold = allowed.equals(NodeType.NODE_LOCAL) ? nodeLocalityThreshold :
double threshold = allowed.equals(NodeType.NODE_LOCAL) ?
nodeLocalityThreshold :
rackLocalityThreshold;
// Relax locality constraints once we've surpassed threshold.
@ -292,13 +298,15 @@ public class FSAppAttempt extends SchedulerApplicationAttempt
if (allowed.equals(NodeType.NODE_LOCAL)) {
allowedLocalityLevel.put(schedulerKey, NodeType.RACK_LOCAL);
resetSchedulingOpportunities(schedulerKey);
}
else if (allowed.equals(NodeType.RACK_LOCAL)) {
} else if (allowed.equals(NodeType.RACK_LOCAL)) {
allowedLocalityLevel.put(schedulerKey, NodeType.OFF_SWITCH);
resetSchedulingOpportunities(schedulerKey);
}
}
return allowedLocalityLevel.get(schedulerKey);
} finally {
writeLock.unlock();
}
}
/**
@ -311,23 +319,26 @@ public class FSAppAttempt extends SchedulerApplicationAttempt
* @param currentTimeMs currentTimeMs
* @return NodeType
*/
public synchronized NodeType getAllowedLocalityLevelByTime(
NodeType getAllowedLocalityLevelByTime(
SchedulerRequestKey schedulerKey, long nodeLocalityDelayMs,
long rackLocalityDelayMs, long currentTimeMs) {
// if not being used, can schedule anywhere
if (nodeLocalityDelayMs < 0 || rackLocalityDelayMs < 0) {
return NodeType.OFF_SWITCH;
}
try {
writeLock.lock();
// default level is NODE_LOCAL
if (!allowedLocalityLevel.containsKey(schedulerKey)) {
// add the initial time of priority to prevent comparing with FsApp
// startTime and allowedLocalityLevel degrade
lastScheduledContainer.put(schedulerKey, currentTimeMs);
if (LOG.isDebugEnabled()) {
LOG.debug("Init the lastScheduledContainer time, priority: "
+ schedulerKey.getPriority() + ", time: " + currentTimeMs);
LOG.debug(
"Init the lastScheduledContainer time, priority: " + schedulerKey
.getPriority() + ", time: " + currentTimeMs);
}
allowedLocalityLevel.put(schedulerKey, NodeType.NODE_LOCAL);
return NodeType.NODE_LOCAL;
@ -349,7 +360,8 @@ public class FSAppAttempt extends SchedulerApplicationAttempt
}
long thresholdTime = allowed.equals(NodeType.NODE_LOCAL) ?
nodeLocalityDelayMs : rackLocalityDelayMs;
nodeLocalityDelayMs :
rackLocalityDelayMs;
if (waitTime > thresholdTime) {
if (allowed.equals(NodeType.NODE_LOCAL)) {
@ -361,21 +373,27 @@ public class FSAppAttempt extends SchedulerApplicationAttempt
}
}
return allowedLocalityLevel.get(schedulerKey);
} finally {
writeLock.unlock();
}
}
synchronized public RMContainer allocate(NodeType type, FSSchedulerNode node,
public RMContainer allocate(NodeType type, FSSchedulerNode node,
SchedulerRequestKey schedulerKey, ResourceRequest request,
Container reservedContainer) {
RMContainer rmContainer;
Container container;
try {
writeLock.lock();
// Update allowed locality level
NodeType allowed = allowedLocalityLevel.get(schedulerKey);
if (allowed != null) {
if (allowed.equals(NodeType.OFF_SWITCH) &&
(type.equals(NodeType.NODE_LOCAL) ||
type.equals(NodeType.RACK_LOCAL))) {
if (allowed.equals(NodeType.OFF_SWITCH) && (type.equals(
NodeType.NODE_LOCAL) || type.equals(NodeType.RACK_LOCAL))) {
this.resetAllowedLocalityLevel(schedulerKey, type);
}
else if (allowed.equals(NodeType.RACK_LOCAL) &&
type.equals(NodeType.NODE_LOCAL)) {
} else if (allowed.equals(NodeType.RACK_LOCAL) && type.equals(
NodeType.NODE_LOCAL)) {
this.resetAllowedLocalityLevel(schedulerKey, type);
}
}
@ -386,14 +404,14 @@ public class FSAppAttempt extends SchedulerApplicationAttempt
return null;
}
Container container = reservedContainer;
container = reservedContainer;
if (container == null) {
container =
createContainer(node, request.getCapability(), schedulerKey);
container = createContainer(node, request.getCapability(),
schedulerKey);
}
// Create RMContainer
RMContainer rmContainer = new RMContainerImpl(container,
rmContainer = new RMContainerImpl(container,
getApplicationAttemptId(), node.getNodeID(),
appSchedulingInfo.getUser(), rmContext);
((RMContainerImpl) rmContainer).setQueueName(this.getQueueName());
@ -415,14 +433,16 @@ public class FSAppAttempt extends SchedulerApplicationAttempt
new RMContainerEvent(container.getId(), RMContainerEventType.START));
if (LOG.isDebugEnabled()) {
LOG.debug("allocate: applicationAttemptId="
+ container.getId().getApplicationAttemptId()
+ " container=" + container.getId() + " host="
+ container.getNodeId().getHost() + " type=" + type);
LOG.debug("allocate: applicationAttemptId=" + container.getId()
.getApplicationAttemptId() + " container=" + container.getId()
+ " host=" + container.getNodeId().getHost() + " type=" + type);
}
RMAuditLogger.logSuccess(getUser(), AuditConstants.ALLOC_CONTAINER,
"SchedulerApp", getApplicationId(), container.getId(),
container.getResource());
} finally {
writeLock.unlock();
}
RMAuditLogger.logSuccess(getUser(),
AuditConstants.ALLOC_CONTAINER, "SchedulerApp",
getApplicationId(), container.getId(), container.getResource());
return rmContainer;
}
@ -434,19 +454,30 @@ public class FSAppAttempt extends SchedulerApplicationAttempt
* @param schedulerKey Scheduler Key
* @param level NodeType
*/
public synchronized void resetAllowedLocalityLevel(
public void resetAllowedLocalityLevel(
SchedulerRequestKey schedulerKey, NodeType level) {
NodeType old = allowedLocalityLevel.get(schedulerKey);
LOG.info("Raising locality level from " + old + " to " + level + " at " +
" priority " + schedulerKey.getPriority());
allowedLocalityLevel.put(schedulerKey, level);
NodeType old;
try {
writeLock.lock();
old = allowedLocalityLevel.put(schedulerKey, level);
} finally {
writeLock.unlock();
}
LOG.info("Raising locality level from " + old + " to " + level + " at "
+ " priority " + schedulerKey.getPriority());
}
// related methods
public void addPreemption(RMContainer container, long time) {
assert preemptionMap.get(container) == null;
try {
writeLock.lock();
preemptionMap.put(container, time);
Resources.addTo(preemptedResources, container.getAllocatedResource());
} finally {
writeLock.unlock();
}
}
public Long getContainerPreemptionTime(RMContainer container) {
@ -584,22 +615,36 @@ public class FSAppAttempt extends SchedulerApplicationAttempt
getUser(), rmContainer.getContainer().getResource());
}
private synchronized void setReservation(SchedulerNode node) {
String rackName = node.getRackName() == null ? "NULL" : node.getRackName();
private void setReservation(SchedulerNode node) {
String rackName =
node.getRackName() == null ? "NULL" : node.getRackName();
try {
writeLock.lock();
Set<String> rackReservations = reservations.get(rackName);
if (rackReservations == null) {
rackReservations = new HashSet<>();
reservations.put(rackName, rackReservations);
}
rackReservations.add(node.getNodeName());
} finally {
writeLock.unlock();
}
}
private synchronized void clearReservation(SchedulerNode node) {
String rackName = node.getRackName() == null ? "NULL" : node.getRackName();
private void clearReservation(SchedulerNode node) {
String rackName =
node.getRackName() == null ? "NULL" : node.getRackName();
try {
writeLock.lock();
Set<String> rackReservations = reservations.get(rackName);
if (rackReservations != null) {
rackReservations.remove(node.getNodeName());
}
} finally {
writeLock.unlock();
}
}
int getNumReservations(String rackName, boolean isAny) {
@ -737,7 +782,8 @@ public class FSAppAttempt extends SchedulerApplicationAttempt
// For each priority, see if we can schedule a node local, rack local
// or off-switch request. Rack of off-switch requests may be delayed
// (not scheduled) in order to promote better locality.
synchronized (this) {
try {
writeLock.lock();
for (SchedulerRequestKey schedulerKey : keysToTry) {
// Skip it for reserved container, since
// we already check it in isValidReservation.
@ -772,8 +818,8 @@ public class FSAppAttempt extends SchedulerApplicationAttempt
if (rackLocalRequest != null && rackLocalRequest.getNumContainers() != 0
&& localRequest != null && localRequest.getNumContainers() != 0) {
return assignContainer(node, localRequest,
NodeType.NODE_LOCAL, reserved, schedulerKey);
return assignContainer(node, localRequest, NodeType.NODE_LOCAL,
reserved, schedulerKey);
}
if (rackLocalRequest != null && !rackLocalRequest.getRelaxLocality()) {
@ -781,29 +827,31 @@ public class FSAppAttempt extends SchedulerApplicationAttempt
}
if (rackLocalRequest != null && rackLocalRequest.getNumContainers() != 0
&& (allowedLocality.equals(NodeType.RACK_LOCAL) ||
allowedLocality.equals(NodeType.OFF_SWITCH))) {
return assignContainer(node, rackLocalRequest,
NodeType.RACK_LOCAL, reserved, schedulerKey);
&& (allowedLocality.equals(NodeType.RACK_LOCAL) || allowedLocality
.equals(NodeType.OFF_SWITCH))) {
return assignContainer(node, rackLocalRequest, NodeType.RACK_LOCAL,
reserved, schedulerKey);
}
ResourceRequest offSwitchRequest =
getResourceRequest(schedulerKey, ResourceRequest.ANY);
ResourceRequest offSwitchRequest = getResourceRequest(schedulerKey,
ResourceRequest.ANY);
if (offSwitchRequest != null && !offSwitchRequest.getRelaxLocality()) {
continue;
}
if (offSwitchRequest != null &&
offSwitchRequest.getNumContainers() != 0) {
if (!hasNodeOrRackLocalRequests(schedulerKey) ||
allowedLocality.equals(NodeType.OFF_SWITCH)) {
return assignContainer(
node, offSwitchRequest, NodeType.OFF_SWITCH, reserved,
schedulerKey);
if (offSwitchRequest != null
&& offSwitchRequest.getNumContainers() != 0) {
if (!hasNodeOrRackLocalRequests(schedulerKey) || allowedLocality
.equals(NodeType.OFF_SWITCH)) {
return assignContainer(node, offSwitchRequest, NodeType.OFF_SWITCH,
reserved, schedulerKey);
}
}
}
} finally {
writeLock.unlock();
}
return Resources.none();
}
@ -963,14 +1011,17 @@ public class FSAppAttempt extends SchedulerApplicationAttempt
Resources.addTo(demand, getCurrentConsumption());
// Add up outstanding resource requests
synchronized (this) {
try {
writeLock.lock();
for (SchedulerRequestKey k : getSchedulerKeys()) {
ResourceRequest r = getResourceRequest(k, ResourceRequest.ANY);
if (r != null) {
Resources.multiplyAndAddTo(demand,
r.getCapability(), r.getNumContainers());
Resources.multiplyAndAddTo(demand, r.getCapability(),
r.getNumContainers());
}
}
} finally {
writeLock.unlock();
}
}