YARN-3139. Improve locks in AbstractYarnScheduler/CapacityScheduler/FairScheduler. Contributed by Wangda Tan

This commit is contained in:
Jian He 2016-10-04 17:23:13 -07:00
parent 44f48ee96e
commit 31f8da22d0
7 changed files with 1784 additions and 1520 deletions

View File

@ -211,10 +211,7 @@ public class RMServerUtils {
}
/**
* Validate increase/decrease request. This function must be called under
* the queue lock to make sure that the access to container resource is
* atomic. Refer to LeafQueue.decreaseContainer() and
* CapacityScheduelr.updateIncreaseRequests()
* Validate increase/decrease request.
* <pre>
* - Throw exception when any other error happens
* </pre>

View File

@ -28,6 +28,7 @@ import java.util.Set;
import java.util.Timer;
import java.util.TimerTask;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
@ -72,8 +73,6 @@ import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerReco
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeCleanContainerEvent;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.activities.ActivitiesManager;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity
.LeafQueue;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.QueueEntitlement;
import org.apache.hadoop.yarn.util.resource.Resources;
import com.google.common.annotations.VisibleForTesting;
@ -94,7 +93,7 @@ public abstract class AbstractYarnScheduler
protected Resource minimumAllocation;
protected RMContext rmContext;
protected volatile RMContext rmContext;
private volatile Priority maxClusterLevelAppPriority;
@ -112,6 +111,18 @@ public abstract class AbstractYarnScheduler
protected static final Allocation EMPTY_ALLOCATION = new Allocation(
EMPTY_CONTAINER_LIST, Resources.createResource(0), null, null, null);
protected final ReentrantReadWriteLock.ReadLock readLock;
/*
* Use writeLock for any of operations below:
* - queue change (hierarchy / configuration / container allocation)
* - application(add/remove/allocate-container, but not include container
* finish)
* - node (add/remove/change-resource/container-allocation, but not include
* container finish)
*/
protected final ReentrantReadWriteLock.WriteLock writeLock;
/**
* Construct the service.
*
@ -119,6 +130,9 @@ public abstract class AbstractYarnScheduler
*/
public AbstractYarnScheduler(String name) {
super(name);
ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
readLock = lock.readLock();
writeLock = lock.writeLock();
}
@Override
@ -141,6 +155,10 @@ public abstract class AbstractYarnScheduler
return nodeTracker;
}
/*
* YARN-3136 removed synchronized lock for this method for performance
* purposes
*/
public List<Container> getTransferredContainers(
ApplicationAttemptId currentAttempt) {
ApplicationId appId = currentAttempt.getApplicationId();
@ -155,9 +173,8 @@ public abstract class AbstractYarnScheduler
}
Collection<RMContainer> liveContainers =
app.getCurrentAppAttempt().getLiveContainers();
ContainerId amContainerId =
rmContext.getRMApps().get(appId).getCurrentAppAttempt()
.getMasterContainer().getId();
ContainerId amContainerId = rmContext.getRMApps().get(appId)
.getCurrentAppAttempt().getMasterContainer().getId();
for (RMContainer rmContainer : liveContainers) {
if (!rmContainer.getContainerId().equals(amContainerId)) {
containerList.add(rmContainer.getContainer());
@ -211,54 +228,59 @@ public abstract class AbstractYarnScheduler
nodeTracker.setConfiguredMaxAllocation(maximumAllocation);
}
protected synchronized void containerLaunchedOnNode(
protected void containerLaunchedOnNode(
ContainerId containerId, SchedulerNode node) {
try {
readLock.lock();
// Get the application for the finished container
SchedulerApplicationAttempt application =
getCurrentAttemptForContainer(containerId);
SchedulerApplicationAttempt application = getCurrentAttemptForContainer(
containerId);
if (application == null) {
LOG.info("Unknown application " + containerId.getApplicationAttemptId()
.getApplicationId() + " launched container " + containerId
+ " on node: " + node);
this.rmContext.getDispatcher().getEventHandler()
.handle(new RMNodeCleanContainerEvent(node.getNodeID(), containerId));
this.rmContext.getDispatcher().getEventHandler().handle(
new RMNodeCleanContainerEvent(node.getNodeID(), containerId));
return;
}
application.containerLaunchedOnNode(containerId, node.getNodeID());
} finally {
readLock.unlock();
}
}
protected void containerIncreasedOnNode(ContainerId containerId,
SchedulerNode node, Container increasedContainerReportedByNM) {
/*
* No lock is required, as this method is protected by scheduler's writeLock
*/
// Get the application for the finished container
SchedulerApplicationAttempt application =
getCurrentAttemptForContainer(containerId);
SchedulerApplicationAttempt application = getCurrentAttemptForContainer(
containerId);
if (application == null) {
LOG.info("Unknown application "
+ containerId.getApplicationAttemptId().getApplicationId()
+ " increased container " + containerId + " on node: " + node);
this.rmContext.getDispatcher().getEventHandler()
.handle(new RMNodeCleanContainerEvent(node.getNodeID(), containerId));
LOG.info("Unknown application " + containerId.getApplicationAttemptId()
.getApplicationId() + " increased container " + containerId
+ " on node: " + node);
this.rmContext.getDispatcher().getEventHandler().handle(
new RMNodeCleanContainerEvent(node.getNodeID(), containerId));
return;
}
LeafQueue leafQueue = (LeafQueue) application.getQueue();
synchronized (leafQueue) {
RMContainer rmContainer = getRMContainer(containerId);
if (rmContainer == null) {
// Some unknown container sneaked into the system. Kill it.
this.rmContext.getDispatcher().getEventHandler()
.handle(new RMNodeCleanContainerEvent(
node.getNodeID(), containerId));
this.rmContext.getDispatcher().getEventHandler().handle(
new RMNodeCleanContainerEvent(node.getNodeID(), containerId));
return;
}
rmContainer.handle(new RMContainerNMDoneChangeResourceEvent(
containerId, increasedContainerReportedByNM.getResource()));
}
rmContainer.handle(new RMContainerNMDoneChangeResourceEvent(containerId,
increasedContainerReportedByNM.getResource()));
}
public T getApplicationAttempt(ApplicationAttemptId applicationAttemptId) {
SchedulerApplication<T> app =
applications.get(applicationAttemptId.getApplicationId());
SchedulerApplication<T> app = applications.get(
applicationAttemptId.getApplicationId());
return app == null ? null : app.getCurrentAppAttempt();
}
@ -338,17 +360,20 @@ public abstract class AbstractYarnScheduler
}
}
public synchronized void recoverContainersOnNode(
public void recoverContainersOnNode(
List<NMContainerStatus> containerReports, RMNode nm) {
try {
writeLock.lock();
if (!rmContext.isWorkPreservingRecoveryEnabled()
|| containerReports == null
|| (containerReports != null && containerReports.isEmpty())) {
|| containerReports == null || (containerReports != null
&& containerReports.isEmpty())) {
return;
}
for (NMContainerStatus container : containerReports) {
ApplicationId appId =
container.getContainerId().getApplicationAttemptId().getApplicationId();
container.getContainerId().getApplicationAttemptId()
.getApplicationId();
RMApp rmApp = rmContext.getRMApps().get(appId);
if (rmApp == null) {
LOG.error("Skip recovering container " + container
@ -360,8 +385,8 @@ public abstract class AbstractYarnScheduler
SchedulerApplication<T> schedulerApp = applications.get(appId);
if (schedulerApp == null) {
LOG.info("Skip recovering container " + container
+ " for unknown SchedulerApplication. Application current state is "
+ rmApp.getState());
+ " for unknown SchedulerApplication. "
+ "Application current state is " + rmApp.getState());
killOrphanContainerOnNode(nm, container);
continue;
}
@ -373,8 +398,8 @@ public abstract class AbstractYarnScheduler
if (!rmApp.getApplicationSubmissionContext()
.getKeepContainersAcrossApplicationAttempts()) {
// Do not recover containers for stopped attempt or previous attempt.
if (schedulerAttempt.isStopped()
|| !schedulerAttempt.getApplicationAttemptId().equals(
if (schedulerAttempt.isStopped() || !schedulerAttempt
.getApplicationAttemptId().equals(
container.getContainerId().getApplicationAttemptId())) {
LOG.info("Skip recovering container " + container
+ " for already stopped attempt.");
@ -387,8 +412,8 @@ public abstract class AbstractYarnScheduler
RMContainer rmContainer = recoverAndCreateContainer(container, nm);
// recover RMContainer
rmContainer.handle(new RMContainerRecoverEvent(container.getContainerId(),
container));
rmContainer.handle(
new RMContainerRecoverEvent(container.getContainerId(), container));
// recover scheduler node
SchedulerNode schedulerNode = nodeTracker.getNode(nm.getNodeID());
@ -396,8 +421,8 @@ public abstract class AbstractYarnScheduler
// recover queue: update headroom etc.
Queue queue = schedulerAttempt.getQueue();
queue.recoverContainer(
getClusterResource(), schedulerAttempt, rmContainer);
queue.recoverContainer(getClusterResource(), schedulerAttempt,
rmContainer);
// recover scheduler attempt
schedulerAttempt.recoverContainer(schedulerNode, rmContainer);
@ -410,24 +435,26 @@ public abstract class AbstractYarnScheduler
// Mark current running AMContainer's RMContainer based on the master
// container ID stored in AppAttempt.
if (masterContainer != null
&& masterContainer.getId().equals(rmContainer.getContainerId())) {
if (masterContainer != null && masterContainer.getId().equals(
rmContainer.getContainerId())) {
((RMContainerImpl) rmContainer).setAMContainer(true);
}
}
synchronized (schedulerAttempt) {
Set<ContainerId> releases = schedulerAttempt.getPendingRelease();
if (releases.contains(container.getContainerId())) {
if (schedulerAttempt.getPendingRelease().remove(
container.getContainerId())) {
// release the container
rmContainer.handle(new RMContainerFinishedEvent(container
.getContainerId(), SchedulerUtils.createAbnormalContainerStatus(
container.getContainerId(), SchedulerUtils.RELEASED_CONTAINER),
rmContainer.handle(
new RMContainerFinishedEvent(container.getContainerId(),
SchedulerUtils
.createAbnormalContainerStatus(container.getContainerId(),
SchedulerUtils.RELEASED_CONTAINER),
RMContainerEventType.RELEASED));
releases.remove(container.getContainerId());
LOG.info(container.getContainerId() + " is released by application.");
}
}
} finally {
writeLock.unlock();
}
}
@ -492,7 +519,6 @@ public abstract class AbstractYarnScheduler
for (SchedulerApplication<T> app : applications.values()) {
T attempt = app.getCurrentAppAttempt();
if (attempt != null) {
synchronized (attempt) {
for (ContainerId containerId : attempt.getPendingRelease()) {
RMAuditLogger.logFailure(app.getUser(),
AuditConstants.RELEASE_CONTAINER,
@ -505,7 +531,6 @@ public abstract class AbstractYarnScheduler
}
}
}
}
@VisibleForTesting
@Private
@ -558,9 +583,7 @@ public abstract class AbstractYarnScheduler
< nmExpireInterval) {
LOG.info(containerId + " doesn't exist. Add the container"
+ " to the release request cache as it maybe on recovery.");
synchronized (attempt) {
attempt.getPendingRelease().add(containerId);
}
} else {
RMAuditLogger.logFailure(attempt.getUser(),
AuditConstants.RELEASE_CONTAINER,
@ -603,8 +626,10 @@ public abstract class AbstractYarnScheduler
}
@Override
public synchronized void moveAllApps(String sourceQueue, String destQueue)
public void moveAllApps(String sourceQueue, String destQueue)
throws YarnException {
try {
writeLock.lock();
// check if destination queue is a valid leaf queue
try {
getQueueInfo(destQueue, false, false);
@ -615,23 +640,27 @@ public abstract class AbstractYarnScheduler
// check if source queue is a valid
List<ApplicationAttemptId> apps = getAppsInQueue(sourceQueue);
if (apps == null) {
String errMsg = "The specified Queue: " + sourceQueue + " doesn't exist";
String errMsg =
"The specified Queue: " + sourceQueue + " doesn't exist";
LOG.warn(errMsg);
throw new YarnException(errMsg);
}
// generate move events for each pending/running app
for (ApplicationAttemptId app : apps) {
SettableFuture<Object> future = SettableFuture.create();
this.rmContext
.getDispatcher()
.getEventHandler()
.handle(new RMAppMoveEvent(app.getApplicationId(), destQueue, future));
this.rmContext.getDispatcher().getEventHandler().handle(
new RMAppMoveEvent(app.getApplicationId(), destQueue, future));
}
} finally {
writeLock.unlock();
}
}
@Override
public synchronized void killAllAppsInQueue(String queueName)
public void killAllAppsInQueue(String queueName)
throws YarnException {
try {
writeLock.lock();
// check if queue is a valid
List<ApplicationAttemptId> apps = getAppsInQueue(queueName);
if (apps == null) {
@ -641,20 +670,23 @@ public abstract class AbstractYarnScheduler
}
// generate kill events for each pending/running app
for (ApplicationAttemptId app : apps) {
this.rmContext
.getDispatcher()
.getEventHandler()
.handle(new RMAppEvent(app.getApplicationId(), RMAppEventType.KILL,
"Application killed due to expiry of reservation queue " +
queueName + "."));
this.rmContext.getDispatcher().getEventHandler().handle(
new RMAppEvent(app.getApplicationId(), RMAppEventType.KILL,
"Application killed due to expiry of reservation queue "
+ queueName + "."));
}
} finally {
writeLock.unlock();
}
}
/**
* Process resource update on a node.
*/
public synchronized void updateNodeResource(RMNode nm,
public void updateNodeResource(RMNode nm,
ResourceOption resourceOption) {
try {
writeLock.lock();
SchedulerNode node = getSchedulerNode(nm.getNodeID());
Resource newResource = resourceOption.getResource();
Resource oldResource = node.getTotalResource();
@ -664,9 +696,8 @@ public abstract class AbstractYarnScheduler
newResource);
// Log resource change
LOG.info("Update resource on node: " + node.getNodeName()
+ " from: " + oldResource + ", to: "
+ newResource);
LOG.info("Update resource on node: " + node.getNodeName() + " from: "
+ oldResource + ", to: " + newResource);
nodeTracker.removeNode(nm.getNodeID());
@ -679,6 +710,9 @@ public abstract class AbstractYarnScheduler
LOG.warn("Update resource on node: " + node.getNodeName()
+ " with the same resource: " + newResource);
}
} finally {
writeLock.unlock();
}
}
/** {@inheritDoc} */
@ -735,7 +769,7 @@ public abstract class AbstractYarnScheduler
}
@Override
public synchronized void setClusterMaxPriority(Configuration conf)
public void setClusterMaxPriority(Configuration conf)
throws YarnException {
try {
maxClusterLevelAppPriority = getMaxPriorityFromConf(conf);

View File

@ -19,6 +19,7 @@ package org.apache.hadoop.yarn.server.resourcemanager.scheduler;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
@ -186,7 +187,8 @@ public class SchedulerApplicationAttempt implements SchedulableEntity {
new AppSchedulingInfo(applicationAttemptId, user, queue,
activeUsersManager, rmContext.getEpoch(), attemptResourceUsage);
this.queue = queue;
this.pendingRelease = new HashSet<ContainerId>();
this.pendingRelease = Collections.newSetFromMap(
new ConcurrentHashMap<ContainerId, Boolean>());
this.attemptId = applicationAttemptId;
if (rmContext.getRMApps() != null &&
rmContext.getRMApps()
@ -1191,6 +1193,10 @@ public class SchedulerApplicationAttempt implements SchedulableEntity {
// queue's resource usage for specific partition
}
public ReentrantReadWriteLock.WriteLock getWriteLock() {
return writeLock;
}
@Override
public boolean isRecovering() {
return isAttemptRecovering;

View File

@ -2227,6 +2227,22 @@ public class LeafQueue extends AbstractCSQueue {
}
}
public void updateApplicationPriority(SchedulerApplication<FiCaSchedulerApp> app,
Priority newAppPriority) {
try {
writeLock.lock();
FiCaSchedulerApp attempt = app.getCurrentAppAttempt();
getOrderingPolicy().removeSchedulableEntity(attempt);
// Update new priority in SchedulerApplication
attempt.setPriority(newAppPriority);
getOrderingPolicy().addSchedulableEntity(attempt);
} finally {
writeLock.unlock();
}
}
public OrderingPolicy<FiCaSchedulerApp>
getPendingAppsOrderingPolicy() {
return pendingOrderingPolicy;

View File

@ -23,6 +23,7 @@ import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
@ -666,6 +667,9 @@ public class FiCaSchedulerApp extends SchedulerApplicationAttempt {
} finally {
writeLock.unlock();
}
}
public ReentrantReadWriteLock.WriteLock getWriteLock() {
return this.writeLock;
}
}

View File

@ -186,10 +186,13 @@ public class FairScheduler extends
// an app can be reserved on
protected boolean sizeBasedWeight; // Give larger weights to larger jobs
protected boolean continuousSchedulingEnabled; // Continuous Scheduling enabled or not
protected int continuousSchedulingSleepMs; // Sleep time for each pass in continuous scheduling
// Continuous Scheduling enabled or not
protected boolean continuousSchedulingEnabled;
// Sleep time for each pass in continuous scheduling
protected volatile int continuousSchedulingSleepMs;
// Node available resource comparator
private Comparator<FSSchedulerNode> nodeAvailableResourceComparator =
new NodeAvailableResourceComparator(); // Node available resource comparator
new NodeAvailableResourceComparator();
protected double nodeLocalityThreshold; // Cluster threshold for node locality
protected double rackLocalityThreshold; // Cluster threshold for rack locality
protected long nodeLocalityDelayMs; // Delay for node locality
@ -338,7 +341,9 @@ public class FairScheduler extends
* fair shares, deficits, minimum slot allocations, and amount of used and
* required resources per job.
*/
protected synchronized void update() {
protected void update() {
try {
writeLock.lock();
long start = getClock().getTime();
updateStarvationStats(); // Determine if any queues merit preemption
@ -357,17 +362,19 @@ public class FairScheduler extends
if (LOG.isDebugEnabled()) {
if (--updatesToSkipForDebug < 0) {
updatesToSkipForDebug = UPDATE_DEBUG_FREQUENCY;
LOG.debug("Cluster Capacity: " + clusterResource +
" Allocations: " + rootMetrics.getAllocatedResources() +
" Availability: " + Resource.newInstance(
rootMetrics.getAvailableMB(),
rootMetrics.getAvailableVirtualCores()) +
" Demand: " + rootQueue.getDemand());
LOG.debug("Cluster Capacity: " + clusterResource + " Allocations: "
+ rootMetrics.getAllocatedResources() + " Availability: "
+ Resource.newInstance(rootMetrics.getAvailableMB(),
rootMetrics.getAvailableVirtualCores()) + " Demand: " + rootQueue
.getDemand());
}
}
long duration = getClock().getTime() - start;
fsOpDurations.addUpdateCallDuration(duration);
} finally {
writeLock.unlock();
}
}
/**
@ -389,7 +396,9 @@ public class FairScheduler extends
* such queues exist, compute how many tasks of each type need to be preempted
* and then select the right ones using preemptTasks.
*/
protected synchronized void preemptTasksIfNecessary() {
protected void preemptTasksIfNecessary() {
try {
writeLock.lock();
if (!shouldAttemptPreemption()) {
return;
}
@ -407,6 +416,9 @@ public class FairScheduler extends
if (isResourceGreaterThanNone(resToPreempt)) {
preemptResources(resToPreempt);
}
} finally {
writeLock.unlock();
}
}
/**
@ -549,13 +561,14 @@ public class FairScheduler extends
return deficit;
}
public synchronized RMContainerTokenSecretManager
public RMContainerTokenSecretManager
getContainerTokenSecretManager() {
return rmContext.getContainerTokenSecretManager();
}
// synchronized for sizeBasedWeight
public synchronized ResourceWeights getAppWeight(FSAppAttempt app) {
public ResourceWeights getAppWeight(FSAppAttempt app) {
try {
readLock.lock();
double weight = 1.0;
if (sizeBasedWeight) {
// Set weight based on current memory demand
@ -565,6 +578,10 @@ public class FairScheduler extends
ResourceWeights resourceWeights = app.getResourceWeights();
resourceWeights.setWeight((float) weight);
return resourceWeights;
} finally {
readLock.unlock();
}
}
public Resource getIncrementResourceCapability() {
@ -595,7 +612,7 @@ public class FairScheduler extends
return continuousSchedulingEnabled;
}
public synchronized int getContinuousSchedulingSleepMs() {
public int getContinuousSchedulingSleepMs() {
return continuousSchedulingSleepMs;
}
@ -617,30 +634,33 @@ public class FairScheduler extends
* user. This will accept a new app even if the user or queue is above
* configured limits, but the app will not be marked as runnable.
*/
protected synchronized void addApplication(ApplicationId applicationId,
protected void addApplication(ApplicationId applicationId,
String queueName, String user, boolean isAppRecovering) {
if (queueName == null || queueName.isEmpty()) {
String message = "Reject application " + applicationId +
" submitted by user " + user + " with an empty queue name.";
String message =
"Reject application " + applicationId + " submitted by user " + user
+ " with an empty queue name.";
LOG.info(message);
rmContext.getDispatcher().getEventHandler()
.handle(new RMAppEvent(applicationId,
RMAppEventType.APP_REJECTED, message));
rmContext.getDispatcher().getEventHandler().handle(
new RMAppEvent(applicationId, RMAppEventType.APP_REJECTED,
message));
return;
}
if (queueName.startsWith(".") || queueName.endsWith(".")) {
String message = "Reject application " + applicationId
+ " submitted by user " + user + " with an illegal queue name "
+ queueName + ". "
String message =
"Reject application " + applicationId + " submitted by user " + user
+ " with an illegal queue name " + queueName + ". "
+ "The queue name cannot start/end with period.";
LOG.info(message);
rmContext.getDispatcher().getEventHandler()
.handle(new RMAppEvent(applicationId,
RMAppEventType.APP_REJECTED, message));
rmContext.getDispatcher().getEventHandler().handle(
new RMAppEvent(applicationId, RMAppEventType.APP_REJECTED,
message));
return;
}
try {
writeLock.lock();
RMApp rmApp = rmContext.getRMApps().get(applicationId);
FSLeafQueue queue = assignToQueue(rmApp, queueName, user);
if (queue == null) {
@ -648,17 +668,17 @@ public class FairScheduler extends
}
// Enforce ACLs
UserGroupInformation userUgi = UserGroupInformation.createRemoteUser(user);
UserGroupInformation userUgi = UserGroupInformation.createRemoteUser(
user);
if (!queue.hasAccess(QueueACL.SUBMIT_APPLICATIONS, userUgi)
&& !queue.hasAccess(QueueACL.ADMINISTER_QUEUE, userUgi)) {
String msg = "User " + userUgi.getUserName() +
" cannot submit applications to queue " + queue.getName() +
"(requested queuename is " + queueName + ")";
if (!queue.hasAccess(QueueACL.SUBMIT_APPLICATIONS, userUgi) && !queue
.hasAccess(QueueACL.ADMINISTER_QUEUE, userUgi)) {
String msg = "User " + userUgi.getUserName()
+ " cannot submit applications to queue " + queue.getName()
+ "(requested queuename is " + queueName + ")";
LOG.info(msg);
rmContext.getDispatcher().getEventHandler()
.handle(new RMAppEvent(applicationId,
RMAppEventType.APP_REJECTED, msg));
rmContext.getDispatcher().getEventHandler().handle(
new RMAppEvent(applicationId, RMAppEventType.APP_REJECTED, msg));
return;
}
@ -676,30 +696,33 @@ public class FairScheduler extends
+ " is recovering. Skip notifying APP_ACCEPTED");
}
} else{
rmContext.getDispatcher().getEventHandler()
.handle(new RMAppEvent(applicationId, RMAppEventType.APP_ACCEPTED));
rmContext.getDispatcher().getEventHandler().handle(
new RMAppEvent(applicationId, RMAppEventType.APP_ACCEPTED));
}
} finally {
writeLock.unlock();
}
}
/**
* Add a new application attempt to the scheduler.
*/
protected synchronized void addApplicationAttempt(
protected void addApplicationAttempt(
ApplicationAttemptId applicationAttemptId,
boolean transferStateFromPreviousAttempt,
boolean isAttemptRecovering) {
SchedulerApplication<FSAppAttempt> application =
applications.get(applicationAttemptId.getApplicationId());
try {
writeLock.lock();
SchedulerApplication<FSAppAttempt> application = applications.get(
applicationAttemptId.getApplicationId());
String user = application.getUser();
FSLeafQueue queue = (FSLeafQueue) application.getQueue();
FSAppAttempt attempt =
new FSAppAttempt(this, applicationAttemptId, user,
queue, new ActiveUsersManager(getRootQueueMetrics()),
rmContext);
FSAppAttempt attempt = new FSAppAttempt(this, applicationAttemptId, user,
queue, new ActiveUsersManager(getRootQueueMetrics()), rmContext);
if (transferStateFromPreviousAttempt) {
attempt.transferStateFromPreviousAttempt(application
.getCurrentAppAttempt());
attempt.transferStateFromPreviousAttempt(
application.getCurrentAppAttempt());
}
application.setCurrentAppAttempt(attempt);
@ -726,6 +749,9 @@ public class FairScheduler extends
new RMAppAttemptEvent(applicationAttemptId,
RMAppAttemptEventType.ATTEMPT_ADDED));
}
} finally {
writeLock.unlock();
}
}
/**
@ -770,62 +796,60 @@ public class FairScheduler extends
return queue;
}
private synchronized void removeApplication(ApplicationId applicationId,
private void removeApplication(ApplicationId applicationId,
RMAppState finalState) {
SchedulerApplication<FSAppAttempt> application =
applications.get(applicationId);
SchedulerApplication<FSAppAttempt> application = applications.remove(
applicationId);
if (application == null) {
LOG.warn("Couldn't find application " + applicationId);
return;
}
} else{
application.stop(finalState);
applications.remove(applicationId);
}
}
private synchronized void removeApplicationAttempt(
private void removeApplicationAttempt(
ApplicationAttemptId applicationAttemptId,
RMAppAttemptState rmAppAttemptFinalState, boolean keepContainers) {
LOG.info("Application " + applicationAttemptId + " is done." +
" finalState=" + rmAppAttemptFinalState);
SchedulerApplication<FSAppAttempt> application =
applications.get(applicationAttemptId.getApplicationId());
FSAppAttempt attempt = getSchedulerApp(applicationAttemptId);
try {
writeLock.lock();
LOG.info(
"Application " + applicationAttemptId + " is done." + " finalState="
+ rmAppAttemptFinalState);
FSAppAttempt attempt = getApplicationAttempt(applicationAttemptId);
if (attempt == null || application == null) {
LOG.info("Unknown application " + applicationAttemptId + " has completed!");
if (attempt == null) {
LOG.info(
"Unknown application " + applicationAttemptId + " has completed!");
return;
}
// Release all the running containers
for (RMContainer rmContainer : attempt.getLiveContainers()) {
if (keepContainers
&& rmContainer.getState().equals(RMContainerState.RUNNING)) {
if (keepContainers && rmContainer.getState().equals(
RMContainerState.RUNNING)) {
// do not kill the running container in the case of work-preserving AM
// restart.
LOG.info("Skip killing " + rmContainer.getContainerId());
continue;
}
super.completedContainer(rmContainer,
SchedulerUtils.createAbnormalContainerStatus(
rmContainer.getContainerId(),
super.completedContainer(rmContainer, SchedulerUtils
.createAbnormalContainerStatus(rmContainer.getContainerId(),
SchedulerUtils.COMPLETED_APPLICATION),
RMContainerEventType.KILL);
}
// Release all reserved containers
for (RMContainer rmContainer : attempt.getReservedContainers()) {
super.completedContainer(rmContainer,
SchedulerUtils.createAbnormalContainerStatus(
rmContainer.getContainerId(),
"Application Complete"),
RMContainerEventType.KILL);
super.completedContainer(rmContainer, SchedulerUtils
.createAbnormalContainerStatus(rmContainer.getContainerId(),
"Application Complete"), RMContainerEventType.KILL);
}
// Clean up pending requests, metrics etc.
attempt.stop(rmAppAttemptFinalState);
// Inform the queue
FSLeafQueue queue = queueMgr.getLeafQueue(attempt.getQueue()
.getQueueName(), false);
FSLeafQueue queue = queueMgr.getLeafQueue(
attempt.getQueue().getQueueName(), false);
boolean wasRunnable = queue.removeApp(attempt);
if (wasRunnable) {
@ -835,27 +859,31 @@ public class FairScheduler extends
} else{
maxRunningEnforcer.untrackNonRunnableApp(attempt);
}
} finally {
writeLock.unlock();
}
}
/**
* Clean up a completed container.
*/
@Override
protected synchronized void completedContainerInternal(
protected void completedContainerInternal(
RMContainer rmContainer, ContainerStatus containerStatus,
RMContainerEventType event) {
try {
writeLock.lock();
Container container = rmContainer.getContainer();
// Get the application for the finished container
FSAppAttempt application =
getCurrentAttemptForContainer(container.getId());
FSAppAttempt application = getCurrentAttemptForContainer(
container.getId());
ApplicationId appId =
container.getId().getApplicationAttemptId().getApplicationId();
if (application == null) {
LOG.info("Container " + container + " of" +
" finished application " + appId +
" completed with event " + event);
LOG.info(
"Container " + container + " of" + " finished application " + appId
+ " completed with event " + event);
return;
}
@ -875,11 +903,17 @@ public class FairScheduler extends
+ " released container " + container.getId() + " on node: " + node
+ " with event: " + event);
}
} finally {
writeLock.unlock();
}
}
private synchronized void addNode(List<NMContainerStatus> containerReports,
private void addNode(List<NMContainerStatus> containerReports,
RMNode node) {
FSSchedulerNode schedulerNode = new FSSchedulerNode(node, usePortForNodeName);
try {
writeLock.lock();
FSSchedulerNode schedulerNode = new FSSchedulerNode(node,
usePortForNodeName);
nodeTracker.addNode(schedulerNode);
triggerUpdate();
@ -887,14 +921,19 @@ public class FairScheduler extends
Resource clusterResource = getClusterResource();
queueMgr.getRootQueue().setSteadyFairShare(clusterResource);
queueMgr.getRootQueue().recomputeSteadyShares();
LOG.info("Added node " + node.getNodeAddress() +
" cluster capacity: " + clusterResource);
LOG.info("Added node " + node.getNodeAddress() + " cluster capacity: "
+ clusterResource);
recoverContainersOnNode(containerReports, node);
updateRootQueueMetrics();
} finally {
writeLock.unlock();
}
}
private synchronized void removeNode(RMNode rmNode) {
private void removeNode(RMNode rmNode) {
try {
writeLock.lock();
NodeId nodeId = rmNode.getNodeID();
FSSchedulerNode node = nodeTracker.getNode(nodeId);
if (node == null) {
@ -906,21 +945,17 @@ public class FairScheduler extends
List<RMContainer> runningContainers =
node.getCopiedListOfRunningContainers();
for (RMContainer container : runningContainers) {
super.completedContainer(container,
SchedulerUtils.createAbnormalContainerStatus(
container.getContainerId(),
SchedulerUtils.LOST_CONTAINER),
RMContainerEventType.KILL);
super.completedContainer(container, SchedulerUtils
.createAbnormalContainerStatus(container.getContainerId(),
SchedulerUtils.LOST_CONTAINER), RMContainerEventType.KILL);
}
// Remove reservations, if any
RMContainer reservedContainer = node.getReservedContainer();
if (reservedContainer != null) {
super.completedContainer(reservedContainer,
SchedulerUtils.createAbnormalContainerStatus(
reservedContainer.getContainerId(),
SchedulerUtils.LOST_CONTAINER),
RMContainerEventType.KILL);
super.completedContainer(reservedContainer, SchedulerUtils
.createAbnormalContainerStatus(reservedContainer.getContainerId(),
SchedulerUtils.LOST_CONTAINER), RMContainerEventType.KILL);
}
nodeTracker.removeNode(nodeId);
@ -930,8 +965,11 @@ public class FairScheduler extends
updateRootQueueMetrics();
triggerUpdate();
LOG.info("Removed node " + rmNode.getNodeAddress() +
" cluster capacity: " + clusterResource);
LOG.info("Removed node " + rmNode.getNodeAddress() + " cluster capacity: "
+ clusterResource);
} finally {
writeLock.unlock();
}
}
@Override
@ -960,12 +998,13 @@ public class FairScheduler extends
// Release containers
releaseContainers(release, application);
synchronized (application) {
try {
application.getWriteLock().lock();
if (!ask.isEmpty()) {
if (LOG.isDebugEnabled()) {
LOG.debug("allocate: pre-update" +
" applicationAttemptId=" + appAttemptId +
" application=" + application.getApplicationId());
LOG.debug(
"allocate: pre-update" + " applicationAttemptId=" + appAttemptId
+ " application=" + application.getApplicationId());
}
application.showRequests();
@ -974,12 +1013,15 @@ public class FairScheduler extends
application.showRequests();
}
} finally {
application.getWriteLock().unlock();
}
if (LOG.isDebugEnabled()) {
LOG.debug("allocate: post-update" +
" applicationAttemptId=" + appAttemptId +
" #ask=" + ask.size() +
" reservation= " + application.getCurrentReservation());
LOG.debug(
"allocate: post-update" + " applicationAttemptId=" + appAttemptId
+ " #ask=" + ask.size() + " reservation= " + application
.getCurrentReservation());
LOG.debug("Preempting " + application.getPreemptionContainers().size()
+ " container(s)");
@ -1002,27 +1044,32 @@ public class FairScheduler extends
Resource headroom = application.getHeadroom();
application.setApplicationHeadroomForMetrics(headroom);
return new Allocation(newlyAllocatedContainers, headroom,
preemptionContainerIds, null, null, application.pullUpdatedNMTokens());
}
preemptionContainerIds, null, null,
application.pullUpdatedNMTokens());
}
/**
* Process a heartbeat update from a node.
*/
private synchronized void nodeUpdate(RMNode nm) {
private void nodeUpdate(RMNode nm) {
try {
writeLock.lock();
long start = getClock().getTime();
if (LOG.isDebugEnabled()) {
LOG.debug("nodeUpdate: " + nm +
" cluster capacity: " + getClusterResource());
LOG.debug(
"nodeUpdate: " + nm + " cluster capacity: " + getClusterResource());
}
eventLog.log("HEARTBEAT", nm.getHostName());
FSSchedulerNode node = getFSSchedulerNode(nm.getNodeID());
List<UpdatedContainerInfo> containerInfoList = nm.pullContainerUpdates();
List<ContainerStatus> newlyLaunchedContainers = new ArrayList<ContainerStatus>();
List<ContainerStatus> completedContainers = new ArrayList<ContainerStatus>();
List<ContainerStatus> newlyLaunchedContainers =
new ArrayList<ContainerStatus>();
List<ContainerStatus> completedContainers =
new ArrayList<ContainerStatus>();
for (UpdatedContainerInfo containerInfo : containerInfoList) {
newlyLaunchedContainers.addAll(containerInfo.getNewlyLaunchedContainers());
newlyLaunchedContainers.addAll(
containerInfo.getNewlyLaunchedContainers());
completedContainers.addAll(containerInfo.getCompletedContainers());
}
// Processing the newly launched containers
@ -1042,13 +1089,11 @@ public class FairScheduler extends
// resource equal to the used resource, so no available resource to
// schedule.
if (nm.getState() == NodeState.DECOMMISSIONING) {
this.rmContext
.getDispatcher()
.getEventHandler()
.handle(
this.rmContext.getDispatcher().getEventHandler().handle(
new RMNodeResourceUpdateEvent(nm.getNodeID(), ResourceOption
.newInstance(getSchedulerNode(nm.getNodeID())
.getAllocatedResource(), 0)));
.newInstance(
getSchedulerNode(nm.getNodeID()).getAllocatedResource(),
0)));
}
if (continuousSchedulingEnabled) {
@ -1066,6 +1111,9 @@ public class FairScheduler extends
long duration = getClock().getTime() - start;
fsOpDurations.addNodeUpdateDuration(duration);
} finally {
writeLock.unlock();
}
}
void continuousSchedulingAttempt() throws InterruptedException {
@ -1126,9 +1174,11 @@ public class FairScheduler extends
}
@VisibleForTesting
synchronized void attemptScheduling(FSSchedulerNode node) {
if (rmContext.isWorkPreservingRecoveryEnabled()
&& !rmContext.isSchedulerReadyForAllocatingContainers()) {
void attemptScheduling(FSSchedulerNode node) {
try {
writeLock.lock();
if (rmContext.isWorkPreservingRecoveryEnabled() && !rmContext
.isSchedulerReadyForAllocatingContainers()) {
return;
}
@ -1136,8 +1186,8 @@ public class FairScheduler extends
if (!nodeTracker.exists(nodeID)) {
// The node might have just been removed while this thread was waiting
// on the synchronized lock before it entered this synchronized method
LOG.info("Skipping scheduling as the node " + nodeID +
" has been removed");
LOG.info(
"Skipping scheduling as the node " + nodeID + " has been removed");
return;
}
@ -1154,8 +1204,8 @@ public class FairScheduler extends
// No reservation, schedule at queue which is farthest below fair share
int assignedContainers = 0;
Resource assignedResource = Resources.clone(Resources.none());
Resource maxResourcesToAssign =
Resources.multiply(node.getUnallocatedResource(), 0.5f);
Resource maxResourcesToAssign = Resources.multiply(
node.getUnallocatedResource(), 0.5f);
while (node.getReservedContainer() == null) {
boolean assignedContainer = false;
Resource assignment = queueMgr.getRootQueue().assignContainer(node);
@ -1164,14 +1214,19 @@ public class FairScheduler extends
assignedContainer = true;
Resources.addTo(assignedResource, assignment);
}
if (!assignedContainer) { break; }
if (!shouldContinueAssigning(assignedContainers,
maxResourcesToAssign, assignedResource)) {
if (!assignedContainer) {
break;
}
if (!shouldContinueAssigning(assignedContainers, maxResourcesToAssign,
assignedResource)) {
break;
}
}
}
updateRootQueueMetrics();
} finally {
writeLock.unlock();
}
}
public FSAppAttempt getSchedulerApp(ApplicationAttemptId appAttemptId) {
@ -1314,9 +1369,11 @@ public class FairScheduler extends
}
}
private synchronized String resolveReservationQueueName(String queueName,
private String resolveReservationQueueName(String queueName,
ApplicationId applicationId, ReservationId reservationID,
boolean isRecovering) {
try {
readLock.lock();
FSQueue queue = queueMgr.getQueue(queueName);
if ((queue == null) || !allocConf.isReservable(queue.getQueueName())) {
return queueName;
@ -1332,14 +1389,12 @@ public class FairScheduler extends
// move to the default child queue of the plan
return getDefaultQueueForPlanQueue(queueName);
}
String message =
"Application "
+ applicationId
+ " submitted to a reservation which is not yet currently active: "
+ resQName;
this.rmContext.getDispatcher().getEventHandler()
.handle(new RMAppEvent(applicationId,
RMAppEventType.APP_REJECTED, message));
String message = "Application " + applicationId
+ " submitted to a reservation which is not yet "
+ "currently active: " + resQName;
this.rmContext.getDispatcher().getEventHandler().handle(
new RMAppEvent(applicationId, RMAppEventType.APP_REJECTED,
message));
return null;
}
if (!queue.getParent().getQueueName().equals(queueName)) {
@ -1347,9 +1402,9 @@ public class FairScheduler extends
"Application: " + applicationId + " submitted to a reservation "
+ resQName + " which does not belong to the specified queue: "
+ queueName;
this.rmContext.getDispatcher().getEventHandler()
.handle(new RMAppEvent(applicationId,
RMAppEventType.APP_REJECTED, message));
this.rmContext.getDispatcher().getEventHandler().handle(
new RMAppEvent(applicationId, RMAppEventType.APP_REJECTED,
message));
return null;
}
// use the reservation queue to run the app
@ -1359,6 +1414,10 @@ public class FairScheduler extends
queueName = getDefaultQueueForPlanQueue(queueName);
}
return queueName;
} finally {
readLock.unlock();
}
}
private String getDefaultQueueForPlanQueue(String queueName) {
@ -1372,12 +1431,13 @@ public class FairScheduler extends
// NOT IMPLEMENTED
}
public synchronized void setRMContext(RMContext rmContext) {
public void setRMContext(RMContext rmContext) {
this.rmContext = rmContext;
}
private void initScheduler(Configuration conf) throws IOException {
synchronized (this) {
try {
writeLock.lock();
this.conf = new FairSchedulerConfiguration(conf);
validateConf(this.conf);
minimumAllocation = this.conf.getMinimumAllocation();
@ -1385,8 +1445,7 @@ public class FairScheduler extends
incrAllocation = this.conf.getIncrementAllocation();
updateReservationThreshold();
continuousSchedulingEnabled = this.conf.isContinuousSchedulingEnabled();
continuousSchedulingSleepMs =
this.conf.getContinuousSchedulingSleepMs();
continuousSchedulingSleepMs = this.conf.getContinuousSchedulingSleepMs();
nodeLocalityThreshold = this.conf.getLocalityThresholdNode();
rackLocalityThreshold = this.conf.getLocalityThresholdRack();
nodeLocalityDelayMs = this.conf.getLocalityDelayNodeMs();
@ -1407,8 +1466,8 @@ public class FairScheduler extends
if (updateInterval < 0) {
updateInterval = FairSchedulerConfiguration.DEFAULT_UPDATE_INTERVAL_MS;
LOG.warn(FairSchedulerConfiguration.UPDATE_INTERVAL_MS
+ " is invalid, so using default value " +
+FairSchedulerConfiguration.DEFAULT_UPDATE_INTERVAL_MS
+ " is invalid, so using default value "
+ +FairSchedulerConfiguration.DEFAULT_UPDATE_INTERVAL_MS
+ " ms instead");
}
@ -1416,8 +1475,7 @@ public class FairScheduler extends
fsOpDurations = FSOpDurations.getInstance(true);
// This stores per-application scheduling information
this.applications = new ConcurrentHashMap<
ApplicationId, SchedulerApplication<FSAppAttempt>>();
this.applications = new ConcurrentHashMap<>();
this.eventLog = new FairSchedulerEventLog();
eventLog.init(this.conf);
@ -1438,6 +1496,8 @@ public class FairScheduler extends
schedulingThread.setName("FairSchedulerContinuousScheduling");
schedulingThread.setDaemon(true);
}
} finally {
writeLock.unlock();
}
allocsLoader.init(conf);
@ -1460,15 +1520,21 @@ public class FairScheduler extends
reservationThreshold = newThreshold;
}
private synchronized void startSchedulerThreads() {
private void startSchedulerThreads() {
try {
writeLock.lock();
Preconditions.checkNotNull(updateThread, "updateThread is null");
Preconditions.checkNotNull(allocsLoader, "allocsLoader is null");
updateThread.start();
if (continuousSchedulingEnabled) {
Preconditions.checkNotNull(schedulingThread, "schedulingThread is null");
Preconditions.checkNotNull(schedulingThread,
"schedulingThread is null");
schedulingThread.start();
}
allocsLoader.start();
} finally {
writeLock.unlock();
}
}
@Override
@ -1485,7 +1551,8 @@ public class FairScheduler extends
@Override
public void serviceStop() throws Exception {
synchronized (this) {
try {
writeLock.lock();
if (updateThread != null) {
updateThread.interrupt();
updateThread.join(THREAD_JOIN_TIMEOUT_MS);
@ -1499,6 +1566,8 @@ public class FairScheduler extends
if (allocsLoader != null) {
allocsLoader.stop();
}
} finally {
writeLock.unlock();
}
super.serviceStop();
@ -1542,17 +1611,22 @@ public class FairScheduler extends
}
@Override
public synchronized boolean checkAccess(UserGroupInformation callerUGI,
public boolean checkAccess(UserGroupInformation callerUGI,
QueueACL acl, String queueName) {
try {
readLock.lock();
FSQueue queue = getQueueManager().getQueue(queueName);
if (queue == null) {
if (LOG.isDebugEnabled()) {
LOG.debug("ACL not found for queue access-type " + acl
+ " for queue " + queueName);
LOG.debug("ACL not found for queue access-type " + acl + " for queue "
+ queueName);
}
return false;
}
return queue.hasAccess(acl, callerUGI);
} finally {
readLock.unlock();
}
}
public AllocationConfiguration getAllocationConfiguration() {
@ -1566,12 +1640,16 @@ public class FairScheduler extends
public void onReload(AllocationConfiguration queueInfo) {
// Commit the reload; also create any queue defined in the alloc file
// if it does not already exist, so it can be displayed on the web UI.
synchronized (FairScheduler.this) {
writeLock.lock();
try {
allocConf = queueInfo;
allocConf.getDefaultSchedulingPolicy().initialize(getClusterResource());
queueMgr.updateAllocationConfiguration(allocConf);
applyChildDefaults();
maxRunningEnforcer.updateRunnabilityOnReload();
} finally {
writeLock.unlock();
}
}
}
@ -1616,15 +1694,19 @@ public class FairScheduler extends
}
@Override
public synchronized String moveApplication(ApplicationId appId,
public String moveApplication(ApplicationId appId,
String queueName) throws YarnException {
try {
writeLock.lock();
SchedulerApplication<FSAppAttempt> app = applications.get(appId);
if (app == null) {
throw new YarnException("App to be moved " + appId + " not found.");
}
FSAppAttempt attempt = (FSAppAttempt) app.getCurrentAppAttempt();
// To serialize with FairScheduler#allocate, synchronize on app attempt
synchronized (attempt) {
try {
attempt.getWriteLock().lock();
FSLeafQueue oldQueue = (FSLeafQueue) app.getQueue();
String destQueueName = handleMoveToPlanQueue(queueName);
FSLeafQueue targetQueue = queueMgr.getLeafQueue(destQueueName, false);
@ -1642,6 +1724,11 @@ public class FairScheduler extends
executeMove(app, attempt, oldQueue, targetQueue);
return targetQueue.getQueueName();
} finally {
attempt.getWriteLock().unlock();
}
} finally {
writeLock.unlock();
}
}
@ -1737,12 +1824,17 @@ public class FairScheduler extends
* Process resource update on a node and update Queue.
*/
@Override
public synchronized void updateNodeResource(RMNode nm,
public void updateNodeResource(RMNode nm,
ResourceOption resourceOption) {
try {
writeLock.lock();
super.updateNodeResource(nm, resourceOption);
updateRootQueueMetrics();
queueMgr.getRootQueue().setSteadyFairShare(getClusterResource());
queueMgr.getRootQueue().recomputeSteadyShares();
} finally {
writeLock.unlock();
}
}
/** {@inheritDoc} */