diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMServerUtils.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMServerUtils.java
index b90e499601d..b2a085a479f 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMServerUtils.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMServerUtils.java
@@ -211,10 +211,7 @@ public class RMServerUtils {
}
/**
- * Validate increase/decrease request. This function must be called under
- * the queue lock to make sure that the access to container resource is
- * atomic. Refer to LeafQueue.decreaseContainer() and
- * CapacityScheduelr.updateIncreaseRequests()
+ * Validate increase/decrease request.
*
* - Throw exception when any other error happens
*
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AbstractYarnScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AbstractYarnScheduler.java
index 45415de7e98..645e06dbf54 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AbstractYarnScheduler.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AbstractYarnScheduler.java
@@ -28,6 +28,7 @@ import java.util.Set;
import java.util.Timer;
import java.util.TimerTask;
import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
@@ -72,8 +73,6 @@ import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerReco
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeCleanContainerEvent;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.activities.ActivitiesManager;
-import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity
- .LeafQueue;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.QueueEntitlement;
import org.apache.hadoop.yarn.util.resource.Resources;
import com.google.common.annotations.VisibleForTesting;
@@ -94,7 +93,7 @@ public abstract class AbstractYarnScheduler
protected Resource minimumAllocation;
- protected RMContext rmContext;
+ protected volatile RMContext rmContext;
private volatile Priority maxClusterLevelAppPriority;
@@ -112,6 +111,18 @@ public abstract class AbstractYarnScheduler
protected static final Allocation EMPTY_ALLOCATION = new Allocation(
EMPTY_CONTAINER_LIST, Resources.createResource(0), null, null, null);
+ protected final ReentrantReadWriteLock.ReadLock readLock;
+
+ /*
+ * Use writeLock for any of operations below:
+ * - queue change (hierarchy / configuration / container allocation)
+ * - application(add/remove/allocate-container, but not include container
+ * finish)
+ * - node (add/remove/change-resource/container-allocation, but not include
+ * container finish)
+ */
+ protected final ReentrantReadWriteLock.WriteLock writeLock;
+
/**
* Construct the service.
*
@@ -119,6 +130,9 @@ public abstract class AbstractYarnScheduler
*/
public AbstractYarnScheduler(String name) {
super(name);
+ ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
+ readLock = lock.readLock();
+ writeLock = lock.writeLock();
}
@Override
@@ -141,6 +155,10 @@ public abstract class AbstractYarnScheduler
return nodeTracker;
}
+ /*
+ * YARN-3136 removed synchronized lock for this method for performance
+ * purposes
+ */
public List getTransferredContainers(
ApplicationAttemptId currentAttempt) {
ApplicationId appId = currentAttempt.getApplicationId();
@@ -155,9 +173,8 @@ public abstract class AbstractYarnScheduler
}
Collection liveContainers =
app.getCurrentAppAttempt().getLiveContainers();
- ContainerId amContainerId =
- rmContext.getRMApps().get(appId).getCurrentAppAttempt()
- .getMasterContainer().getId();
+ ContainerId amContainerId = rmContext.getRMApps().get(appId)
+ .getCurrentAppAttempt().getMasterContainer().getId();
for (RMContainer rmContainer : liveContainers) {
if (!rmContainer.getContainerId().equals(amContainerId)) {
containerList.add(rmContainer.getContainer());
@@ -211,54 +228,59 @@ public abstract class AbstractYarnScheduler
nodeTracker.setConfiguredMaxAllocation(maximumAllocation);
}
- protected synchronized void containerLaunchedOnNode(
+ protected void containerLaunchedOnNode(
ContainerId containerId, SchedulerNode node) {
- // Get the application for the finished container
- SchedulerApplicationAttempt application =
- getCurrentAttemptForContainer(containerId);
- if (application == null) {
- LOG.info("Unknown application " + containerId.getApplicationAttemptId()
- .getApplicationId() + " launched container " + containerId
- + " on node: " + node);
- this.rmContext.getDispatcher().getEventHandler()
- .handle(new RMNodeCleanContainerEvent(node.getNodeID(), containerId));
- return;
- }
+ try {
+ readLock.lock();
+ // Get the application for the finished container
+ SchedulerApplicationAttempt application = getCurrentAttemptForContainer(
+ containerId);
+ if (application == null) {
+ LOG.info("Unknown application " + containerId.getApplicationAttemptId()
+ .getApplicationId() + " launched container " + containerId
+ + " on node: " + node);
+ this.rmContext.getDispatcher().getEventHandler().handle(
+ new RMNodeCleanContainerEvent(node.getNodeID(), containerId));
+ return;
+ }
- application.containerLaunchedOnNode(containerId, node.getNodeID());
+ application.containerLaunchedOnNode(containerId, node.getNodeID());
+ } finally {
+ readLock.unlock();
+ }
}
protected void containerIncreasedOnNode(ContainerId containerId,
SchedulerNode node, Container increasedContainerReportedByNM) {
+ /*
+ * No lock is required, as this method is protected by scheduler's writeLock
+ */
// Get the application for the finished container
- SchedulerApplicationAttempt application =
- getCurrentAttemptForContainer(containerId);
+ SchedulerApplicationAttempt application = getCurrentAttemptForContainer(
+ containerId);
if (application == null) {
- LOG.info("Unknown application "
- + containerId.getApplicationAttemptId().getApplicationId()
- + " increased container " + containerId + " on node: " + node);
- this.rmContext.getDispatcher().getEventHandler()
- .handle(new RMNodeCleanContainerEvent(node.getNodeID(), containerId));
+ LOG.info("Unknown application " + containerId.getApplicationAttemptId()
+ .getApplicationId() + " increased container " + containerId
+ + " on node: " + node);
+ this.rmContext.getDispatcher().getEventHandler().handle(
+ new RMNodeCleanContainerEvent(node.getNodeID(), containerId));
return;
}
- LeafQueue leafQueue = (LeafQueue) application.getQueue();
- synchronized (leafQueue) {
- RMContainer rmContainer = getRMContainer(containerId);
- if (rmContainer == null) {
- // Some unknown container sneaked into the system. Kill it.
- this.rmContext.getDispatcher().getEventHandler()
- .handle(new RMNodeCleanContainerEvent(
- node.getNodeID(), containerId));
- return;
- }
- rmContainer.handle(new RMContainerNMDoneChangeResourceEvent(
- containerId, increasedContainerReportedByNM.getResource()));
+
+ RMContainer rmContainer = getRMContainer(containerId);
+ if (rmContainer == null) {
+ // Some unknown container sneaked into the system. Kill it.
+ this.rmContext.getDispatcher().getEventHandler().handle(
+ new RMNodeCleanContainerEvent(node.getNodeID(), containerId));
+ return;
}
+ rmContainer.handle(new RMContainerNMDoneChangeResourceEvent(containerId,
+ increasedContainerReportedByNM.getResource()));
}
public T getApplicationAttempt(ApplicationAttemptId applicationAttemptId) {
- SchedulerApplication app =
- applications.get(applicationAttemptId.getApplicationId());
+ SchedulerApplication app = applications.get(
+ applicationAttemptId.getApplicationId());
return app == null ? null : app.getCurrentAppAttempt();
}
@@ -338,96 +360,101 @@ public abstract class AbstractYarnScheduler
}
}
- public synchronized void recoverContainersOnNode(
+ public void recoverContainersOnNode(
List containerReports, RMNode nm) {
- if (!rmContext.isWorkPreservingRecoveryEnabled()
- || containerReports == null
- || (containerReports != null && containerReports.isEmpty())) {
- return;
- }
-
- for (NMContainerStatus container : containerReports) {
- ApplicationId appId =
- container.getContainerId().getApplicationAttemptId().getApplicationId();
- RMApp rmApp = rmContext.getRMApps().get(appId);
- if (rmApp == null) {
- LOG.error("Skip recovering container " + container
- + " for unknown application.");
- killOrphanContainerOnNode(nm, container);
- continue;
+ try {
+ writeLock.lock();
+ if (!rmContext.isWorkPreservingRecoveryEnabled()
+ || containerReports == null || (containerReports != null
+ && containerReports.isEmpty())) {
+ return;
}
- SchedulerApplication schedulerApp = applications.get(appId);
- if (schedulerApp == null) {
- LOG.info("Skip recovering container " + container
- + " for unknown SchedulerApplication. Application current state is "
- + rmApp.getState());
- killOrphanContainerOnNode(nm, container);
- continue;
- }
-
- LOG.info("Recovering container " + container);
- SchedulerApplicationAttempt schedulerAttempt =
- schedulerApp.getCurrentAppAttempt();
-
- if (!rmApp.getApplicationSubmissionContext()
- .getKeepContainersAcrossApplicationAttempts()) {
- // Do not recover containers for stopped attempt or previous attempt.
- if (schedulerAttempt.isStopped()
- || !schedulerAttempt.getApplicationAttemptId().equals(
- container.getContainerId().getApplicationAttemptId())) {
- LOG.info("Skip recovering container " + container
- + " for already stopped attempt.");
+ for (NMContainerStatus container : containerReports) {
+ ApplicationId appId =
+ container.getContainerId().getApplicationAttemptId()
+ .getApplicationId();
+ RMApp rmApp = rmContext.getRMApps().get(appId);
+ if (rmApp == null) {
+ LOG.error("Skip recovering container " + container
+ + " for unknown application.");
killOrphanContainerOnNode(nm, container);
continue;
}
- }
- // create container
- RMContainer rmContainer = recoverAndCreateContainer(container, nm);
-
- // recover RMContainer
- rmContainer.handle(new RMContainerRecoverEvent(container.getContainerId(),
- container));
-
- // recover scheduler node
- SchedulerNode schedulerNode = nodeTracker.getNode(nm.getNodeID());
- schedulerNode.recoverContainer(rmContainer);
-
- // recover queue: update headroom etc.
- Queue queue = schedulerAttempt.getQueue();
- queue.recoverContainer(
- getClusterResource(), schedulerAttempt, rmContainer);
-
- // recover scheduler attempt
- schedulerAttempt.recoverContainer(schedulerNode, rmContainer);
-
- // set master container for the current running AMContainer for this
- // attempt.
- RMAppAttempt appAttempt = rmApp.getCurrentAppAttempt();
- if (appAttempt != null) {
- Container masterContainer = appAttempt.getMasterContainer();
-
- // Mark current running AMContainer's RMContainer based on the master
- // container ID stored in AppAttempt.
- if (masterContainer != null
- && masterContainer.getId().equals(rmContainer.getContainerId())) {
- ((RMContainerImpl)rmContainer).setAMContainer(true);
+ SchedulerApplication schedulerApp = applications.get(appId);
+ if (schedulerApp == null) {
+ LOG.info("Skip recovering container " + container
+ + " for unknown SchedulerApplication. "
+ + "Application current state is " + rmApp.getState());
+ killOrphanContainerOnNode(nm, container);
+ continue;
}
- }
- synchronized (schedulerAttempt) {
- Set releases = schedulerAttempt.getPendingRelease();
- if (releases.contains(container.getContainerId())) {
+ LOG.info("Recovering container " + container);
+ SchedulerApplicationAttempt schedulerAttempt =
+ schedulerApp.getCurrentAppAttempt();
+
+ if (!rmApp.getApplicationSubmissionContext()
+ .getKeepContainersAcrossApplicationAttempts()) {
+ // Do not recover containers for stopped attempt or previous attempt.
+ if (schedulerAttempt.isStopped() || !schedulerAttempt
+ .getApplicationAttemptId().equals(
+ container.getContainerId().getApplicationAttemptId())) {
+ LOG.info("Skip recovering container " + container
+ + " for already stopped attempt.");
+ killOrphanContainerOnNode(nm, container);
+ continue;
+ }
+ }
+
+ // create container
+ RMContainer rmContainer = recoverAndCreateContainer(container, nm);
+
+ // recover RMContainer
+ rmContainer.handle(
+ new RMContainerRecoverEvent(container.getContainerId(), container));
+
+ // recover scheduler node
+ SchedulerNode schedulerNode = nodeTracker.getNode(nm.getNodeID());
+ schedulerNode.recoverContainer(rmContainer);
+
+ // recover queue: update headroom etc.
+ Queue queue = schedulerAttempt.getQueue();
+ queue.recoverContainer(getClusterResource(), schedulerAttempt,
+ rmContainer);
+
+ // recover scheduler attempt
+ schedulerAttempt.recoverContainer(schedulerNode, rmContainer);
+
+ // set master container for the current running AMContainer for this
+ // attempt.
+ RMAppAttempt appAttempt = rmApp.getCurrentAppAttempt();
+ if (appAttempt != null) {
+ Container masterContainer = appAttempt.getMasterContainer();
+
+ // Mark current running AMContainer's RMContainer based on the master
+ // container ID stored in AppAttempt.
+ if (masterContainer != null && masterContainer.getId().equals(
+ rmContainer.getContainerId())) {
+ ((RMContainerImpl) rmContainer).setAMContainer(true);
+ }
+ }
+
+ if (schedulerAttempt.getPendingRelease().remove(
+ container.getContainerId())) {
// release the container
- rmContainer.handle(new RMContainerFinishedEvent(container
- .getContainerId(), SchedulerUtils.createAbnormalContainerStatus(
- container.getContainerId(), SchedulerUtils.RELEASED_CONTAINER),
- RMContainerEventType.RELEASED));
- releases.remove(container.getContainerId());
+ rmContainer.handle(
+ new RMContainerFinishedEvent(container.getContainerId(),
+ SchedulerUtils
+ .createAbnormalContainerStatus(container.getContainerId(),
+ SchedulerUtils.RELEASED_CONTAINER),
+ RMContainerEventType.RELEASED));
LOG.info(container.getContainerId() + " is released by application.");
}
}
+ } finally {
+ writeLock.unlock();
}
}
@@ -492,17 +519,15 @@ public abstract class AbstractYarnScheduler
for (SchedulerApplication app : applications.values()) {
T attempt = app.getCurrentAppAttempt();
if (attempt != null) {
- synchronized (attempt) {
- for (ContainerId containerId : attempt.getPendingRelease()) {
- RMAuditLogger.logFailure(app.getUser(),
- AuditConstants.RELEASE_CONTAINER,
- "Unauthorized access or invalid container", "Scheduler",
- "Trying to release container not owned by app "
- + "or with invalid id.", attempt.getApplicationId(),
- containerId, null);
- }
- attempt.getPendingRelease().clear();
+ for (ContainerId containerId : attempt.getPendingRelease()) {
+ RMAuditLogger.logFailure(app.getUser(),
+ AuditConstants.RELEASE_CONTAINER,
+ "Unauthorized access or invalid container", "Scheduler",
+ "Trying to release container not owned by app "
+ + "or with invalid id.", attempt.getApplicationId(),
+ containerId, null);
}
+ attempt.getPendingRelease().clear();
}
}
}
@@ -558,9 +583,7 @@ public abstract class AbstractYarnScheduler
< nmExpireInterval) {
LOG.info(containerId + " doesn't exist. Add the container"
+ " to the release request cache as it maybe on recovery.");
- synchronized (attempt) {
- attempt.getPendingRelease().add(containerId);
- }
+ attempt.getPendingRelease().add(containerId);
} else {
RMAuditLogger.logFailure(attempt.getUser(),
AuditConstants.RELEASE_CONTAINER,
@@ -603,81 +626,92 @@ public abstract class AbstractYarnScheduler
}
@Override
- public synchronized void moveAllApps(String sourceQueue, String destQueue)
+ public void moveAllApps(String sourceQueue, String destQueue)
throws YarnException {
- // check if destination queue is a valid leaf queue
try {
- getQueueInfo(destQueue, false, false);
- } catch (IOException e) {
- LOG.warn(e);
- throw new YarnException(e);
- }
- // check if source queue is a valid
- List apps = getAppsInQueue(sourceQueue);
- if (apps == null) {
- String errMsg = "The specified Queue: " + sourceQueue + " doesn't exist";
- LOG.warn(errMsg);
- throw new YarnException(errMsg);
- }
- // generate move events for each pending/running app
- for (ApplicationAttemptId app : apps) {
- SettableFuture future = SettableFuture.create();
- this.rmContext
- .getDispatcher()
- .getEventHandler()
- .handle(new RMAppMoveEvent(app.getApplicationId(), destQueue, future));
+ writeLock.lock();
+ // check if destination queue is a valid leaf queue
+ try {
+ getQueueInfo(destQueue, false, false);
+ } catch (IOException e) {
+ LOG.warn(e);
+ throw new YarnException(e);
+ }
+ // check if source queue is a valid
+ List apps = getAppsInQueue(sourceQueue);
+ if (apps == null) {
+ String errMsg =
+ "The specified Queue: " + sourceQueue + " doesn't exist";
+ LOG.warn(errMsg);
+ throw new YarnException(errMsg);
+ }
+ // generate move events for each pending/running app
+ for (ApplicationAttemptId app : apps) {
+ SettableFuture future = SettableFuture.create();
+ this.rmContext.getDispatcher().getEventHandler().handle(
+ new RMAppMoveEvent(app.getApplicationId(), destQueue, future));
+ }
+ } finally {
+ writeLock.unlock();
}
}
@Override
- public synchronized void killAllAppsInQueue(String queueName)
+ public void killAllAppsInQueue(String queueName)
throws YarnException {
- // check if queue is a valid
- List apps = getAppsInQueue(queueName);
- if (apps == null) {
- String errMsg = "The specified Queue: " + queueName + " doesn't exist";
- LOG.warn(errMsg);
- throw new YarnException(errMsg);
- }
- // generate kill events for each pending/running app
- for (ApplicationAttemptId app : apps) {
- this.rmContext
- .getDispatcher()
- .getEventHandler()
- .handle(new RMAppEvent(app.getApplicationId(), RMAppEventType.KILL,
- "Application killed due to expiry of reservation queue " +
- queueName + "."));
+ try {
+ writeLock.lock();
+ // check if queue is a valid
+ List apps = getAppsInQueue(queueName);
+ if (apps == null) {
+ String errMsg = "The specified Queue: " + queueName + " doesn't exist";
+ LOG.warn(errMsg);
+ throw new YarnException(errMsg);
+ }
+ // generate kill events for each pending/running app
+ for (ApplicationAttemptId app : apps) {
+ this.rmContext.getDispatcher().getEventHandler().handle(
+ new RMAppEvent(app.getApplicationId(), RMAppEventType.KILL,
+ "Application killed due to expiry of reservation queue "
+ + queueName + "."));
+ }
+ } finally {
+ writeLock.unlock();
}
}
/**
* Process resource update on a node.
*/
- public synchronized void updateNodeResource(RMNode nm,
+ public void updateNodeResource(RMNode nm,
ResourceOption resourceOption) {
- SchedulerNode node = getSchedulerNode(nm.getNodeID());
- Resource newResource = resourceOption.getResource();
- Resource oldResource = node.getTotalResource();
- if(!oldResource.equals(newResource)) {
- // Notify NodeLabelsManager about this change
- rmContext.getNodeLabelManager().updateNodeResource(nm.getNodeID(),
- newResource);
-
- // Log resource change
- LOG.info("Update resource on node: " + node.getNodeName()
- + " from: " + oldResource + ", to: "
- + newResource);
+ try {
+ writeLock.lock();
+ SchedulerNode node = getSchedulerNode(nm.getNodeID());
+ Resource newResource = resourceOption.getResource();
+ Resource oldResource = node.getTotalResource();
+ if (!oldResource.equals(newResource)) {
+ // Notify NodeLabelsManager about this change
+ rmContext.getNodeLabelManager().updateNodeResource(nm.getNodeID(),
+ newResource);
- nodeTracker.removeNode(nm.getNodeID());
+ // Log resource change
+ LOG.info("Update resource on node: " + node.getNodeName() + " from: "
+ + oldResource + ", to: " + newResource);
- // update resource to node
- node.updateTotalResource(newResource);
+ nodeTracker.removeNode(nm.getNodeID());
- nodeTracker.addNode((N) node);
- } else {
- // Log resource change
- LOG.warn("Update resource on node: " + node.getNodeName()
- + " with the same resource: " + newResource);
+ // update resource to node
+ node.updateTotalResource(newResource);
+
+ nodeTracker.addNode((N) node);
+ } else{
+ // Log resource change
+ LOG.warn("Update resource on node: " + node.getNodeName()
+ + " with the same resource: " + newResource);
+ }
+ } finally {
+ writeLock.unlock();
}
}
@@ -735,7 +769,7 @@ public abstract class AbstractYarnScheduler
}
@Override
- public synchronized void setClusterMaxPriority(Configuration conf)
+ public void setClusterMaxPriority(Configuration conf)
throws YarnException {
try {
maxClusterLevelAppPriority = getMaxPriorityFromConf(conf);
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerApplicationAttempt.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerApplicationAttempt.java
index 9675fac5ad7..d148132ce44 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerApplicationAttempt.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerApplicationAttempt.java
@@ -19,6 +19,7 @@ package org.apache.hadoop.yarn.server.resourcemanager.scheduler;
import java.util.ArrayList;
import java.util.Collection;
+import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
@@ -186,7 +187,8 @@ public class SchedulerApplicationAttempt implements SchedulableEntity {
new AppSchedulingInfo(applicationAttemptId, user, queue,
activeUsersManager, rmContext.getEpoch(), attemptResourceUsage);
this.queue = queue;
- this.pendingRelease = new HashSet();
+ this.pendingRelease = Collections.newSetFromMap(
+ new ConcurrentHashMap());
this.attemptId = applicationAttemptId;
if (rmContext.getRMApps() != null &&
rmContext.getRMApps()
@@ -1191,6 +1193,10 @@ public class SchedulerApplicationAttempt implements SchedulableEntity {
// queue's resource usage for specific partition
}
+ public ReentrantReadWriteLock.WriteLock getWriteLock() {
+ return writeLock;
+ }
+
@Override
public boolean isRecovering() {
return isAttemptRecovering;
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java
index 33fe9ad2673..6d00beee8f4 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java
@@ -39,7 +39,6 @@ import org.apache.commons.lang.StringUtils;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.classification.InterfaceAudience.LimitedPrivate;
-import org.apache.hadoop.classification.InterfaceAudience.Private;
import org.apache.hadoop.classification.InterfaceStability.Evolving;
import org.apache.hadoop.conf.Configurable;
import org.apache.hadoop.conf.Configuration;
@@ -267,8 +266,7 @@ public class CapacityScheduler extends
}
@Override
- public synchronized RMContainerTokenSecretManager
- getContainerTokenSecretManager() {
+ public RMContainerTokenSecretManager getContainerTokenSecretManager() {
return this.rmContext.getContainerTokenSecretManager();
}
@@ -293,52 +291,62 @@ public class CapacityScheduler extends
}
@Override
- public synchronized RMContext getRMContext() {
+ public RMContext getRMContext() {
return this.rmContext;
}
@Override
- public synchronized void setRMContext(RMContext rmContext) {
+ public void setRMContext(RMContext rmContext) {
this.rmContext = rmContext;
}
- private synchronized void initScheduler(Configuration configuration) throws
+ private void initScheduler(Configuration configuration) throws
IOException {
- this.conf = loadCapacitySchedulerConfiguration(configuration);
- validateConf(this.conf);
- this.minimumAllocation = this.conf.getMinimumAllocation();
- initMaximumResourceCapability(this.conf.getMaximumAllocation());
- this.calculator = this.conf.getResourceCalculator();
- this.usePortForNodeName = this.conf.getUsePortForNodeName();
- this.applications = new ConcurrentHashMap<>();
- this.labelManager = rmContext.getNodeLabelManager();
- authorizer = YarnAuthorizationProvider.getInstance(yarnConf);
- this.activitiesManager = new ActivitiesManager(rmContext);
- activitiesManager.init(conf);
- initializeQueues(this.conf);
- this.isLazyPreemptionEnabled = conf.getLazyPreemptionEnabled();
+ try {
+ writeLock.lock();
+ this.conf = loadCapacitySchedulerConfiguration(configuration);
+ validateConf(this.conf);
+ this.minimumAllocation = this.conf.getMinimumAllocation();
+ initMaximumResourceCapability(this.conf.getMaximumAllocation());
+ this.calculator = this.conf.getResourceCalculator();
+ this.usePortForNodeName = this.conf.getUsePortForNodeName();
+ this.applications = new ConcurrentHashMap<>();
+ this.labelManager = rmContext.getNodeLabelManager();
+ authorizer = YarnAuthorizationProvider.getInstance(yarnConf);
+ this.activitiesManager = new ActivitiesManager(rmContext);
+ activitiesManager.init(conf);
+ initializeQueues(this.conf);
+ this.isLazyPreemptionEnabled = conf.getLazyPreemptionEnabled();
- scheduleAsynchronously = this.conf.getScheduleAynschronously();
- asyncScheduleInterval =
- this.conf.getLong(ASYNC_SCHEDULER_INTERVAL,
- DEFAULT_ASYNC_SCHEDULER_INTERVAL);
- if (scheduleAsynchronously) {
- asyncSchedulerThread = new AsyncScheduleThread(this);
+ scheduleAsynchronously = this.conf.getScheduleAynschronously();
+ asyncScheduleInterval = this.conf.getLong(ASYNC_SCHEDULER_INTERVAL,
+ DEFAULT_ASYNC_SCHEDULER_INTERVAL);
+ if (scheduleAsynchronously) {
+ asyncSchedulerThread = new AsyncScheduleThread(this);
+ }
+
+ LOG.info("Initialized CapacityScheduler with " + "calculator="
+ + getResourceCalculator().getClass() + ", " + "minimumAllocation=<"
+ + getMinimumResourceCapability() + ">, " + "maximumAllocation=<"
+ + getMaximumResourceCapability() + ">, " + "asynchronousScheduling="
+ + scheduleAsynchronously + ", " + "asyncScheduleInterval="
+ + asyncScheduleInterval + "ms");
+ } finally {
+ writeLock.unlock();
}
-
- LOG.info("Initialized CapacityScheduler with " +
- "calculator=" + getResourceCalculator().getClass() + ", " +
- "minimumAllocation=<" + getMinimumResourceCapability() + ">, " +
- "maximumAllocation=<" + getMaximumResourceCapability() + ">, " +
- "asynchronousScheduling=" + scheduleAsynchronously + ", " +
- "asyncScheduleInterval=" + asyncScheduleInterval + "ms");
}
- private synchronized void startSchedulerThreads() {
- if (scheduleAsynchronously) {
- Preconditions.checkNotNull(asyncSchedulerThread,
- "asyncSchedulerThread is null");
- asyncSchedulerThread.start();
+ private void startSchedulerThreads() {
+ try {
+ writeLock.lock();
+ activitiesManager.start();
+ if (scheduleAsynchronously) {
+ Preconditions.checkNotNull(asyncSchedulerThread,
+ "asyncSchedulerThread is null");
+ asyncSchedulerThread.start();
+ }
+ } finally {
+ writeLock.unlock();
}
}
@@ -352,40 +360,48 @@ public class CapacityScheduler extends
@Override
public void serviceStart() throws Exception {
startSchedulerThreads();
- activitiesManager.start();
super.serviceStart();
}
@Override
public void serviceStop() throws Exception {
- synchronized (this) {
+ try {
+ writeLock.lock();
if (scheduleAsynchronously && asyncSchedulerThread != null) {
asyncSchedulerThread.interrupt();
asyncSchedulerThread.join(THREAD_JOIN_TIMEOUT_MS);
}
+ } finally {
+ writeLock.unlock();
}
+
super.serviceStop();
}
@Override
- public synchronized void
- reinitialize(Configuration conf, RMContext rmContext) throws IOException {
- Configuration configuration = new Configuration(conf);
- CapacitySchedulerConfiguration oldConf = this.conf;
- this.conf = loadCapacitySchedulerConfiguration(configuration);
- validateConf(this.conf);
+ public void reinitialize(Configuration newConf, RMContext rmContext)
+ throws IOException {
try {
- LOG.info("Re-initializing queues...");
- refreshMaximumAllocation(this.conf.getMaximumAllocation());
- reinitializeQueues(this.conf);
- } catch (Throwable t) {
- this.conf = oldConf;
- refreshMaximumAllocation(this.conf.getMaximumAllocation());
- throw new IOException("Failed to re-init queues", t);
- }
+ writeLock.lock();
+ Configuration configuration = new Configuration(newConf);
+ CapacitySchedulerConfiguration oldConf = this.conf;
+ this.conf = loadCapacitySchedulerConfiguration(configuration);
+ validateConf(this.conf);
+ try {
+ LOG.info("Re-initializing queues...");
+ refreshMaximumAllocation(this.conf.getMaximumAllocation());
+ reinitializeQueues(this.conf);
+ } catch (Throwable t) {
+ this.conf = oldConf;
+ refreshMaximumAllocation(this.conf.getMaximumAllocation());
+ throw new IOException("Failed to re-init queues", t);
+ }
- // update lazy preemption
- this.isLazyPreemptionEnabled = this.conf.getLazyPreemptionEnabled();
+ // update lazy preemption
+ this.isLazyPreemptionEnabled = this.conf.getLazyPreemptionEnabled();
+ } finally {
+ writeLock.unlock();
+ }
}
long getAsyncScheduleInterval() {
@@ -449,10 +465,6 @@ public class CapacityScheduler extends
}
}
-
- @Private
- public static final String ROOT_QUEUE =
- CapacitySchedulerConfiguration.PREFIX + CapacitySchedulerConfiguration.ROOT;
static class QueueHook {
public CSQueue hook(CSQueue queue) {
@@ -462,38 +474,41 @@ public class CapacityScheduler extends
private static final QueueHook noop = new QueueHook();
@VisibleForTesting
- public synchronized UserGroupMappingPlacementRule
+ public UserGroupMappingPlacementRule
getUserGroupMappingPlacementRule() throws IOException {
- boolean overrideWithQueueMappings = conf.getOverrideWithQueueMappings();
- LOG.info("Initialized queue mappings, override: "
- + overrideWithQueueMappings);
+ try {
+ readLock.lock();
+ boolean overrideWithQueueMappings = conf.getOverrideWithQueueMappings();
+ LOG.info(
+ "Initialized queue mappings, override: " + overrideWithQueueMappings);
- // Get new user/group mappings
- List newMappings =
- conf.getQueueMappings();
- // check if mappings refer to valid queues
- for (QueueMapping mapping : newMappings) {
- String mappingQueue = mapping.getQueue();
- if (!mappingQueue
- .equals(UserGroupMappingPlacementRule.CURRENT_USER_MAPPING)
- && !mappingQueue
- .equals(UserGroupMappingPlacementRule.PRIMARY_GROUP_MAPPING)) {
- CSQueue queue = queues.get(mappingQueue);
- if (queue == null || !(queue instanceof LeafQueue)) {
- throw new IOException("mapping contains invalid or non-leaf queue "
- + mappingQueue);
+ // Get new user/group mappings
+ List newMappings = conf.getQueueMappings();
+ // check if mappings refer to valid queues
+ for (QueueMapping mapping : newMappings) {
+ String mappingQueue = mapping.getQueue();
+ if (!mappingQueue.equals(
+ UserGroupMappingPlacementRule.CURRENT_USER_MAPPING) && !mappingQueue
+ .equals(UserGroupMappingPlacementRule.PRIMARY_GROUP_MAPPING)) {
+ CSQueue queue = queues.get(mappingQueue);
+ if (queue == null || !(queue instanceof LeafQueue)) {
+ throw new IOException(
+ "mapping contains invalid or non-leaf queue " + mappingQueue);
+ }
}
}
- }
- // initialize groups if mappings are present
- if (newMappings.size() > 0) {
- Groups groups = new Groups(conf);
- return new UserGroupMappingPlacementRule(overrideWithQueueMappings,
- newMappings, groups);
- }
+ // initialize groups if mappings are present
+ if (newMappings.size() > 0) {
+ Groups groups = new Groups(conf);
+ return new UserGroupMappingPlacementRule(overrideWithQueueMappings,
+ newMappings, groups);
+ }
- return null;
+ return null;
+ } finally {
+ readLock.unlock();
+ }
}
private void updatePlacementRules() throws IOException {
@@ -526,12 +541,12 @@ public class CapacityScheduler extends
}
@Lock(CapacityScheduler.class)
- private void reinitializeQueues(CapacitySchedulerConfiguration conf)
+ private void reinitializeQueues(CapacitySchedulerConfiguration newConf)
throws IOException {
// Parse new queues
Map newQueues = new HashMap();
CSQueue newRoot =
- parseQueue(this, conf, null, CapacitySchedulerConfiguration.ROOT,
+ parseQueue(this, newConf, null, CapacitySchedulerConfiguration.ROOT,
newQueues, queues, noop);
// Ensure all existing queues are still present
@@ -693,248 +708,279 @@ public class CapacityScheduler extends
return queues.get(queueName);
}
- private synchronized void addApplicationOnRecovery(
+ private void addApplicationOnRecovery(
ApplicationId applicationId, String queueName, String user,
Priority priority) {
- CSQueue queue = getQueue(queueName);
- if (queue == null) {
- //During a restart, this indicates a queue was removed, which is
- //not presently supported
- if (!YarnConfiguration.shouldRMFailFast(getConfig())) {
- this.rmContext.getDispatcher().getEventHandler().handle(
- new RMAppEvent(applicationId, RMAppEventType.KILL,
- "Application killed on recovery as it was submitted to queue " +
- queueName + " which no longer exists after restart."));
- return;
- } else {
- String queueErrorMsg = "Queue named " + queueName
- + " missing during application recovery."
- + " Queue removal during recovery is not presently supported by the"
- + " capacity scheduler, please restart with all queues configured"
- + " which were present before shutdown/restart.";
- LOG.fatal(queueErrorMsg);
- throw new QueueInvalidException(queueErrorMsg);
- }
- }
- if (!(queue instanceof LeafQueue)) {
- // During RM restart, this means leaf queue was converted to a parent
- // queue, which is not supported for running apps.
- if (!YarnConfiguration.shouldRMFailFast(getConfig())) {
- this.rmContext.getDispatcher().getEventHandler().handle(
- new RMAppEvent(applicationId, RMAppEventType.KILL,
- "Application killed on recovery as it was submitted to queue " +
- queueName + " which is no longer a leaf queue after restart."));
- return;
- } else {
- String queueErrorMsg = "Queue named " + queueName
- + " is no longer a leaf queue during application recovery."
- + " Changing a leaf queue to a parent queue during recovery is"
- + " not presently supported by the capacity scheduler. Please"
- + " restart with leaf queues before shutdown/restart continuing"
- + " as leaf queues.";
- LOG.fatal(queueErrorMsg);
- throw new QueueInvalidException(queueErrorMsg);
- }
- }
- // Submit to the queue
try {
- queue.submitApplication(applicationId, user, queueName);
- } catch (AccessControlException ace) {
- // Ignore the exception for recovered app as the app was previously
- // accepted.
- }
- queue.getMetrics().submitApp(user);
- SchedulerApplication application =
- new SchedulerApplication(queue, user, priority);
- applications.put(applicationId, application);
- LOG.info("Accepted application " + applicationId + " from user: " + user
- + ", in queue: " + queueName);
- if (LOG.isDebugEnabled()) {
- LOG.debug(applicationId + " is recovering. Skip notifying APP_ACCEPTED");
+ writeLock.lock();
+ CSQueue queue = getQueue(queueName);
+ if (queue == null) {
+ //During a restart, this indicates a queue was removed, which is
+ //not presently supported
+ if (!YarnConfiguration.shouldRMFailFast(getConfig())) {
+ this.rmContext.getDispatcher().getEventHandler().handle(
+ new RMAppEvent(applicationId, RMAppEventType.KILL,
+ "Application killed on recovery as it was submitted to queue "
+ + queueName + " which no longer exists after restart."));
+ return;
+ } else{
+ String queueErrorMsg = "Queue named " + queueName
+ + " missing during application recovery."
+ + " Queue removal during recovery is not presently "
+ + "supported by the capacity scheduler, please "
+ + "restart with all queues configured"
+ + " which were present before shutdown/restart.";
+ LOG.fatal(queueErrorMsg);
+ throw new QueueInvalidException(queueErrorMsg);
+ }
+ }
+ if (!(queue instanceof LeafQueue)) {
+ // During RM restart, this means leaf queue was converted to a parent
+ // queue, which is not supported for running apps.
+ if (!YarnConfiguration.shouldRMFailFast(getConfig())) {
+ this.rmContext.getDispatcher().getEventHandler().handle(
+ new RMAppEvent(applicationId, RMAppEventType.KILL,
+ "Application killed on recovery as it was submitted to queue "
+ + queueName
+ + " which is no longer a leaf queue after restart."));
+ return;
+ } else{
+ String queueErrorMsg = "Queue named " + queueName
+ + " is no longer a leaf queue during application recovery."
+ + " Changing a leaf queue to a parent queue during recovery is"
+ + " not presently supported by the capacity scheduler. Please"
+ + " restart with leaf queues before shutdown/restart continuing"
+ + " as leaf queues.";
+ LOG.fatal(queueErrorMsg);
+ throw new QueueInvalidException(queueErrorMsg);
+ }
+ }
+ // Submit to the queue
+ try {
+ queue.submitApplication(applicationId, user, queueName);
+ } catch (AccessControlException ace) {
+ // Ignore the exception for recovered app as the app was previously
+ // accepted.
+ }
+ queue.getMetrics().submitApp(user);
+ SchedulerApplication application =
+ new SchedulerApplication(queue, user, priority);
+ applications.put(applicationId, application);
+ LOG.info("Accepted application " + applicationId + " from user: " + user
+ + ", in queue: " + queueName);
+ if (LOG.isDebugEnabled()) {
+ LOG.debug(
+ applicationId + " is recovering. Skip notifying APP_ACCEPTED");
+ }
+ } finally {
+ writeLock.unlock();
}
}
- private synchronized void addApplication(ApplicationId applicationId,
+ private void addApplication(ApplicationId applicationId,
String queueName, String user, Priority priority) {
- // Sanity checks.
- CSQueue queue = getQueue(queueName);
- if (queue == null) {
- String message = "Application " + applicationId +
- " submitted by user " + user + " to unknown queue: " + queueName;
- this.rmContext.getDispatcher().getEventHandler()
- .handle(new RMAppEvent(applicationId,
- RMAppEventType.APP_REJECTED, message));
- return;
- }
- if (!(queue instanceof LeafQueue)) {
- String message = "Application " + applicationId +
- " submitted by user " + user + " to non-leaf queue: " + queueName;
- this.rmContext.getDispatcher().getEventHandler()
- .handle(new RMAppEvent(applicationId,
- RMAppEventType.APP_REJECTED, message));
- return;
- }
- // Submit to the queue
try {
- queue.submitApplication(applicationId, user, queueName);
- } catch (AccessControlException ace) {
- LOG.info("Failed to submit application " + applicationId + " to queue "
- + queueName + " from user " + user, ace);
- this.rmContext.getDispatcher().getEventHandler()
- .handle(new RMAppEvent(applicationId,
- RMAppEventType.APP_REJECTED, ace.toString()));
- return;
+ writeLock.lock();
+ // Sanity checks.
+ CSQueue queue = getQueue(queueName);
+ if (queue == null) {
+ String message =
+ "Application " + applicationId + " submitted by user " + user
+ + " to unknown queue: " + queueName;
+ this.rmContext.getDispatcher().getEventHandler().handle(
+ new RMAppEvent(applicationId, RMAppEventType.APP_REJECTED,
+ message));
+ return;
+ }
+ if (!(queue instanceof LeafQueue)) {
+ String message =
+ "Application " + applicationId + " submitted by user " + user
+ + " to non-leaf queue: " + queueName;
+ this.rmContext.getDispatcher().getEventHandler().handle(
+ new RMAppEvent(applicationId, RMAppEventType.APP_REJECTED,
+ message));
+ return;
+ }
+ // Submit to the queue
+ try {
+ queue.submitApplication(applicationId, user, queueName);
+ } catch (AccessControlException ace) {
+ LOG.info("Failed to submit application " + applicationId + " to queue "
+ + queueName + " from user " + user, ace);
+ this.rmContext.getDispatcher().getEventHandler().handle(
+ new RMAppEvent(applicationId, RMAppEventType.APP_REJECTED,
+ ace.toString()));
+ return;
+ }
+ // update the metrics
+ queue.getMetrics().submitApp(user);
+ SchedulerApplication application =
+ new SchedulerApplication(queue, user, priority);
+ applications.put(applicationId, application);
+ LOG.info("Accepted application " + applicationId + " from user: " + user
+ + ", in queue: " + queueName);
+ rmContext.getDispatcher().getEventHandler().handle(
+ new RMAppEvent(applicationId, RMAppEventType.APP_ACCEPTED));
+ } finally {
+ writeLock.unlock();
}
- // update the metrics
- queue.getMetrics().submitApp(user);
- SchedulerApplication application =
- new SchedulerApplication(queue, user, priority);
- applications.put(applicationId, application);
- LOG.info("Accepted application " + applicationId + " from user: " + user
- + ", in queue: " + queueName);
- rmContext.getDispatcher().getEventHandler()
- .handle(new RMAppEvent(applicationId, RMAppEventType.APP_ACCEPTED));
}
- private synchronized void addApplicationAttempt(
+ private void addApplicationAttempt(
ApplicationAttemptId applicationAttemptId,
boolean transferStateFromPreviousAttempt,
boolean isAttemptRecovering) {
- SchedulerApplication application =
- applications.get(applicationAttemptId.getApplicationId());
- if (application == null) {
- LOG.warn("Application " + applicationAttemptId.getApplicationId() +
- " cannot be found in scheduler.");
- return;
- }
- CSQueue queue = (CSQueue) application.getQueue();
-
- FiCaSchedulerApp attempt = new FiCaSchedulerApp(applicationAttemptId,
- application.getUser(), queue, queue.getActiveUsersManager(), rmContext,
- application.getPriority(), isAttemptRecovering, activitiesManager);
- if (transferStateFromPreviousAttempt) {
- attempt.transferStateFromPreviousAttempt(
- application.getCurrentAppAttempt());
- }
- application.setCurrentAppAttempt(attempt);
-
- // Update attempt priority to the latest to avoid race condition i.e
- // SchedulerApplicationAttempt is created with old priority but it is not
- // set to SchedulerApplication#setCurrentAppAttempt.
- // Scenario would occur is
- // 1. SchdulerApplicationAttempt is created with old priority.
- // 2. updateApplicationPriority() updates SchedulerApplication. Since
- // currentAttempt is null, it just return.
- // 3. ScheduelerApplcationAttempt is set in
- // SchedulerApplication#setCurrentAppAttempt.
- attempt.setPriority(application.getPriority());
-
- queue.submitApplicationAttempt(attempt, application.getUser());
- LOG.info("Added Application Attempt " + applicationAttemptId
- + " to scheduler from user " + application.getUser() + " in queue "
- + queue.getQueueName());
- if (isAttemptRecovering) {
- if (LOG.isDebugEnabled()) {
- LOG.debug(applicationAttemptId
- + " is recovering. Skipping notifying ATTEMPT_ADDED");
+ try {
+ writeLock.lock();
+ SchedulerApplication application = applications.get(
+ applicationAttemptId.getApplicationId());
+ if (application == null) {
+ LOG.warn("Application " + applicationAttemptId.getApplicationId()
+ + " cannot be found in scheduler.");
+ return;
}
- } else {
- rmContext.getDispatcher().getEventHandler().handle(
- new RMAppAttemptEvent(applicationAttemptId,
- RMAppAttemptEventType.ATTEMPT_ADDED));
- }
- }
+ CSQueue queue = (CSQueue) application.getQueue();
- private synchronized void doneApplication(ApplicationId applicationId,
- RMAppState finalState) {
- SchedulerApplication application =
- applications.get(applicationId);
- if (application == null){
- // The AppRemovedSchedulerEvent maybe sent on recovery for completed apps,
- // ignore it.
- LOG.warn("Couldn't find application " + applicationId);
- return;
- }
- CSQueue queue = (CSQueue) application.getQueue();
- if (!(queue instanceof LeafQueue)) {
- LOG.error("Cannot finish application " + "from non-leaf queue: "
+ FiCaSchedulerApp attempt = new FiCaSchedulerApp(applicationAttemptId,
+ application.getUser(), queue, queue.getActiveUsersManager(),
+ rmContext, application.getPriority(), isAttemptRecovering,
+ activitiesManager);
+ if (transferStateFromPreviousAttempt) {
+ attempt.transferStateFromPreviousAttempt(
+ application.getCurrentAppAttempt());
+ }
+ application.setCurrentAppAttempt(attempt);
+
+ // Update attempt priority to the latest to avoid race condition i.e
+ // SchedulerApplicationAttempt is created with old priority but it is not
+ // set to SchedulerApplication#setCurrentAppAttempt.
+ // Scenario would occur is
+ // 1. SchdulerApplicationAttempt is created with old priority.
+ // 2. updateApplicationPriority() updates SchedulerApplication. Since
+ // currentAttempt is null, it just return.
+ // 3. ScheduelerApplcationAttempt is set in
+ // SchedulerApplication#setCurrentAppAttempt.
+ attempt.setPriority(application.getPriority());
+
+ queue.submitApplicationAttempt(attempt, application.getUser());
+ LOG.info("Added Application Attempt " + applicationAttemptId
+ + " to scheduler from user " + application.getUser() + " in queue "
+ queue.getQueueName());
- } else {
- queue.finishApplication(applicationId, application.getUser());
+ if (isAttemptRecovering) {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug(applicationAttemptId
+ + " is recovering. Skipping notifying ATTEMPT_ADDED");
+ }
+ } else{
+ rmContext.getDispatcher().getEventHandler().handle(
+ new RMAppAttemptEvent(applicationAttemptId,
+ RMAppAttemptEventType.ATTEMPT_ADDED));
+ }
+ } finally {
+ writeLock.unlock();
}
- application.stop(finalState);
- applications.remove(applicationId);
}
- private synchronized void doneApplicationAttempt(
+ private void doneApplication(ApplicationId applicationId,
+ RMAppState finalState) {
+ try {
+ writeLock.lock();
+ SchedulerApplication application = applications.get(
+ applicationId);
+ if (application == null) {
+ // The AppRemovedSchedulerEvent maybe sent on recovery for completed
+ // apps, ignore it.
+ LOG.warn("Couldn't find application " + applicationId);
+ return;
+ }
+ CSQueue queue = (CSQueue) application.getQueue();
+ if (!(queue instanceof LeafQueue)) {
+ LOG.error("Cannot finish application " + "from non-leaf queue: " + queue
+ .getQueueName());
+ } else{
+ queue.finishApplication(applicationId, application.getUser());
+ }
+ application.stop(finalState);
+ applications.remove(applicationId);
+ } finally {
+ writeLock.unlock();
+ }
+ }
+
+ private void doneApplicationAttempt(
ApplicationAttemptId applicationAttemptId,
RMAppAttemptState rmAppAttemptFinalState, boolean keepContainers) {
- LOG.info("Application Attempt " + applicationAttemptId + " is done." +
- " finalState=" + rmAppAttemptFinalState);
-
- FiCaSchedulerApp attempt = getApplicationAttempt(applicationAttemptId);
- SchedulerApplication application =
- applications.get(applicationAttemptId.getApplicationId());
+ try {
+ writeLock.lock();
+ LOG.info("Application Attempt " + applicationAttemptId + " is done."
+ + " finalState=" + rmAppAttemptFinalState);
- if (application == null || attempt == null) {
- LOG.info("Unknown application " + applicationAttemptId + " has completed!");
- return;
- }
+ FiCaSchedulerApp attempt = getApplicationAttempt(applicationAttemptId);
+ SchedulerApplication application = applications.get(
+ applicationAttemptId.getApplicationId());
- // Release all the allocated, acquired, running containers
- for (RMContainer rmContainer : attempt.getLiveContainers()) {
- if (keepContainers
- && rmContainer.getState().equals(RMContainerState.RUNNING)) {
- // do not kill the running container in the case of work-preserving AM
- // restart.
- LOG.info("Skip killing " + rmContainer.getContainerId());
- continue;
+ if (application == null || attempt == null) {
+ LOG.info(
+ "Unknown application " + applicationAttemptId + " has completed!");
+ return;
}
- super.completedContainer(
- rmContainer,
- SchedulerUtils.createAbnormalContainerStatus(
- rmContainer.getContainerId(), SchedulerUtils.COMPLETED_APPLICATION),
- RMContainerEventType.KILL);
- }
- // Release all reserved containers
- for (RMContainer rmContainer : attempt.getReservedContainers()) {
- super.completedContainer(
- rmContainer,
- SchedulerUtils.createAbnormalContainerStatus(
- rmContainer.getContainerId(), "Application Complete"),
- RMContainerEventType.KILL);
- }
+ // Release all the allocated, acquired, running containers
+ for (RMContainer rmContainer : attempt.getLiveContainers()) {
+ if (keepContainers && rmContainer.getState().equals(
+ RMContainerState.RUNNING)) {
+ // do not kill the running container in the case of work-preserving AM
+ // restart.
+ LOG.info("Skip killing " + rmContainer.getContainerId());
+ continue;
+ }
+ super.completedContainer(rmContainer, SchedulerUtils
+ .createAbnormalContainerStatus(rmContainer.getContainerId(),
+ SchedulerUtils.COMPLETED_APPLICATION),
+ RMContainerEventType.KILL);
+ }
- // Clean up pending requests, metrics etc.
- attempt.stop(rmAppAttemptFinalState);
+ // Release all reserved containers
+ for (RMContainer rmContainer : attempt.getReservedContainers()) {
+ super.completedContainer(rmContainer, SchedulerUtils
+ .createAbnormalContainerStatus(rmContainer.getContainerId(),
+ "Application Complete"), RMContainerEventType.KILL);
+ }
- // Inform the queue
- String queueName = attempt.getQueue().getQueueName();
- CSQueue queue = queues.get(queueName);
- if (!(queue instanceof LeafQueue)) {
- LOG.error("Cannot finish application " + "from non-leaf queue: "
- + queueName);
- } else {
- queue.finishApplicationAttempt(attempt, queue.getQueueName());
+ // Clean up pending requests, metrics etc.
+ attempt.stop(rmAppAttemptFinalState);
+
+ // Inform the queue
+ String queueName = attempt.getQueue().getQueueName();
+ CSQueue queue = queues.get(queueName);
+ if (!(queue instanceof LeafQueue)) {
+ LOG.error(
+ "Cannot finish application " + "from non-leaf queue: " + queueName);
+ } else{
+ queue.finishApplicationAttempt(attempt, queue.getQueueName());
+ }
+ } finally {
+ writeLock.unlock();
}
}
- // It is crucial to acquire leaf queue lock first to prevent:
- // 1. Race condition when calculating the delta resource in
- // SchedContainerChangeRequest
- // 2. Deadlock with the scheduling thread.
private LeafQueue updateIncreaseRequests(
- List increaseRequests,
- FiCaSchedulerApp app) {
+ List increaseRequests, FiCaSchedulerApp app) {
if (null == increaseRequests || increaseRequests.isEmpty()) {
return null;
}
+
// Pre-process increase requests
List schedIncreaseRequests =
createSchedContainerChangeRequests(increaseRequests, true);
LeafQueue leafQueue = (LeafQueue) app.getQueue();
- synchronized(leafQueue) {
+
+ try {
+ /*
+ * Acquire application's lock here to make sure application won't
+ * finish when updateIncreaseRequest is called.
+ */
+ app.getWriteLock().lock();
// make sure we aren't stopping/removing the application
// when the allocate comes in
if (app.isStopped()) {
@@ -944,8 +990,12 @@ public class CapacityScheduler extends
if (app.updateIncreaseRequests(schedIncreaseRequests)) {
return leafQueue;
}
- return null;
+ } finally {
+ app.getWriteLock().unlock();
}
+
+
+ return null;
}
@Override
@@ -955,7 +1005,6 @@ public class CapacityScheduler extends
List blacklistAdditions, List blacklistRemovals,
List increaseRequests,
List decreaseRequests) {
-
FiCaSchedulerApp application = getApplicationAttempt(applicationAttemptId);
if (application == null) {
return EMPTY_ALLOCATION;
@@ -965,42 +1014,43 @@ public class CapacityScheduler extends
releaseContainers(release, application);
// update increase requests
- LeafQueue updateDemandForQueue =
- updateIncreaseRequests(increaseRequests, application);
+ LeafQueue updateDemandForQueue = updateIncreaseRequests(increaseRequests,
+ application);
// Decrease containers
decreaseContainers(decreaseRequests, application);
// Sanity check for new allocation requests
- SchedulerUtils.normalizeRequests(
- ask, getResourceCalculator(), getClusterResource(),
- getMinimumResourceCapability(), getMaximumResourceCapability());
+ SchedulerUtils.normalizeRequests(ask, getResourceCalculator(),
+ getClusterResource(), getMinimumResourceCapability(),
+ getMaximumResourceCapability());
Allocation allocation;
- synchronized (application) {
-
- // make sure we aren't stopping/removing the application
- // when the allocate comes in
+ // make sure we aren't stopping/removing the application
+ // when the allocate comes in
+ try {
+ application.getWriteLock().lock();
if (application.isStopped()) {
return EMPTY_ALLOCATION;
}
// Process resource requests
if (!ask.isEmpty()) {
- if(LOG.isDebugEnabled()) {
- LOG.debug("allocate: pre-update " + applicationAttemptId +
- " ask size =" + ask.size());
+ if (LOG.isDebugEnabled()) {
+ LOG.debug(
+ "allocate: pre-update " + applicationAttemptId + " ask size ="
+ + ask.size());
application.showRequests();
}
// Update application requests
- if (application.updateResourceRequests(ask)
- && (updateDemandForQueue == null)) {
+ if (application.updateResourceRequests(ask) && (updateDemandForQueue
+ == null)) {
updateDemandForQueue = (LeafQueue) application.getQueue();
}
- if(LOG.isDebugEnabled()) {
+ if (LOG.isDebugEnabled()) {
LOG.debug("allocate: post-update");
application.showRequests();
}
@@ -1010,6 +1060,8 @@ public class CapacityScheduler extends
allocation = application.getAllocation(getResourceCalculator(),
getClusterResource(), getMinimumResourceCapability());
+ } finally {
+ application.getWriteLock().unlock();
}
if (updateDemandForQueue != null && !application
@@ -1018,7 +1070,6 @@ public class CapacityScheduler extends
}
return allocation;
-
}
@Override
@@ -1048,142 +1099,159 @@ public class CapacityScheduler extends
return root.getQueueUserAclInfo(user);
}
- private synchronized void nodeUpdate(RMNode nm) {
- if (LOG.isDebugEnabled()) {
- LOG.debug("nodeUpdate: " + nm +
- " clusterResources: " + getClusterResource());
- }
+ private void nodeUpdate(RMNode nm) {
+ try {
+ writeLock.lock();
+ if (LOG.isDebugEnabled()) {
+ LOG.debug(
+ "nodeUpdate: " + nm + " clusterResources: " + getClusterResource());
+ }
- Resource releaseResources = Resource.newInstance(0, 0);
+ Resource releaseResources = Resource.newInstance(0, 0);
- FiCaSchedulerNode node = getNode(nm.getNodeID());
-
- List containerInfoList = nm.pullContainerUpdates();
- List newlyLaunchedContainers = new ArrayList();
- List completedContainers = new ArrayList();
- for(UpdatedContainerInfo containerInfo : containerInfoList) {
- newlyLaunchedContainers.addAll(containerInfo.getNewlyLaunchedContainers());
- completedContainers.addAll(containerInfo.getCompletedContainers());
- }
-
- // Processing the newly launched containers
- for (ContainerStatus launchedContainer : newlyLaunchedContainers) {
- containerLaunchedOnNode(launchedContainer.getContainerId(), node);
- }
-
- // Processing the newly increased containers
- List newlyIncreasedContainers =
- nm.pullNewlyIncreasedContainers();
- for (Container container : newlyIncreasedContainers) {
- containerIncreasedOnNode(container.getId(), node, container);
- }
+ FiCaSchedulerNode node = getNode(nm.getNodeID());
- // Process completed containers
- int releasedContainers = 0;
- for (ContainerStatus completedContainer : completedContainers) {
- ContainerId containerId = completedContainer.getContainerId();
- RMContainer container = getRMContainer(containerId);
- super.completedContainer(container, completedContainer,
- RMContainerEventType.FINISHED);
- if (container != null) {
- releasedContainers++;
- Resource rs = container.getAllocatedResource();
- if (rs != null) {
- Resources.addTo(releaseResources, rs);
- }
- rs = container.getReservedResource();
- if (rs != null) {
- Resources.addTo(releaseResources, rs);
+ List containerInfoList = nm.pullContainerUpdates();
+ List newlyLaunchedContainers =
+ new ArrayList();
+ List completedContainers =
+ new ArrayList();
+ for (UpdatedContainerInfo containerInfo : containerInfoList) {
+ newlyLaunchedContainers.addAll(
+ containerInfo.getNewlyLaunchedContainers());
+ completedContainers.addAll(containerInfo.getCompletedContainers());
+ }
+
+ // Processing the newly launched containers
+ for (ContainerStatus launchedContainer : newlyLaunchedContainers) {
+ containerLaunchedOnNode(launchedContainer.getContainerId(), node);
+ }
+
+ // Processing the newly increased containers
+ List newlyIncreasedContainers =
+ nm.pullNewlyIncreasedContainers();
+ for (Container container : newlyIncreasedContainers) {
+ containerIncreasedOnNode(container.getId(), node, container);
+ }
+
+ // Process completed containers
+ int releasedContainers = 0;
+ for (ContainerStatus completedContainer : completedContainers) {
+ ContainerId containerId = completedContainer.getContainerId();
+ RMContainer container = getRMContainer(containerId);
+ super.completedContainer(container, completedContainer,
+ RMContainerEventType.FINISHED);
+ if (container != null) {
+ releasedContainers++;
+ Resource rs = container.getAllocatedResource();
+ if (rs != null) {
+ Resources.addTo(releaseResources, rs);
+ }
+ rs = container.getReservedResource();
+ if (rs != null) {
+ Resources.addTo(releaseResources, rs);
+ }
}
}
- }
- // If the node is decommissioning, send an update to have the total
- // resource equal to the used resource, so no available resource to
- // schedule.
- // TODO: Fix possible race-condition when request comes in before
- // update is propagated
- if (nm.getState() == NodeState.DECOMMISSIONING) {
- this.rmContext
- .getDispatcher()
- .getEventHandler()
- .handle(
- new RMNodeResourceUpdateEvent(nm.getNodeID(), ResourceOption
- .newInstance(getSchedulerNode(nm.getNodeID())
- .getAllocatedResource(), 0)));
- }
- schedulerHealth.updateSchedulerReleaseDetails(lastNodeUpdateTime,
- releaseResources);
- schedulerHealth.updateSchedulerReleaseCounts(releasedContainers);
+ // If the node is decommissioning, send an update to have the total
+ // resource equal to the used resource, so no available resource to
+ // schedule.
+ // TODO: Fix possible race-condition when request comes in before
+ // update is propagated
+ if (nm.getState() == NodeState.DECOMMISSIONING) {
+ this.rmContext.getDispatcher().getEventHandler().handle(
+ new RMNodeResourceUpdateEvent(nm.getNodeID(), ResourceOption
+ .newInstance(
+ getSchedulerNode(nm.getNodeID()).getAllocatedResource(),
+ 0)));
+ }
+ schedulerHealth.updateSchedulerReleaseDetails(lastNodeUpdateTime,
+ releaseResources);
+ schedulerHealth.updateSchedulerReleaseCounts(releasedContainers);
- // Updating node resource utilization
- node.setAggregatedContainersUtilization(
- nm.getAggregatedContainersUtilization());
- node.setNodeUtilization(nm.getNodeUtilization());
+ // Updating node resource utilization
+ node.setAggregatedContainersUtilization(
+ nm.getAggregatedContainersUtilization());
+ node.setNodeUtilization(nm.getNodeUtilization());
- // Now node data structures are upto date and ready for scheduling.
- if(LOG.isDebugEnabled()) {
- LOG.debug("Node being looked for scheduling " + nm +
- " availableResource: " + node.getUnallocatedResource());
+ // Now node data structures are upto date and ready for scheduling.
+ if (LOG.isDebugEnabled()) {
+ LOG.debug(
+ "Node being looked for scheduling " + nm + " availableResource: "
+ + node.getUnallocatedResource());
+ }
+ } finally {
+ writeLock.unlock();
}
}
/**
* Process resource update on a node.
*/
- private synchronized void updateNodeAndQueueResource(RMNode nm,
+ private void updateNodeAndQueueResource(RMNode nm,
ResourceOption resourceOption) {
- updateNodeResource(nm, resourceOption);
- Resource clusterResource = getClusterResource();
- root.updateClusterResource(clusterResource, new ResourceLimits(
- clusterResource));
+ try {
+ writeLock.lock();
+ updateNodeResource(nm, resourceOption);
+ Resource clusterResource = getClusterResource();
+ root.updateClusterResource(clusterResource,
+ new ResourceLimits(clusterResource));
+ } finally {
+ writeLock.unlock();
+ }
}
/**
* Process node labels update on a node.
*/
- private synchronized void updateLabelsOnNode(NodeId nodeId,
+ private void updateLabelsOnNode(NodeId nodeId,
Set newLabels) {
- FiCaSchedulerNode node = nodeTracker.getNode(nodeId);
- if (null == node) {
- return;
- }
-
- // Get new partition, we have only one partition per node
- String newPartition;
- if (newLabels.isEmpty()) {
- newPartition = RMNodeLabelsManager.NO_LABEL;
- } else {
- newPartition = newLabels.iterator().next();
- }
-
- // old partition as well
- String oldPartition = node.getPartition();
-
- // Update resources of these containers
- for (RMContainer rmContainer : node.getCopiedListOfRunningContainers()) {
- FiCaSchedulerApp application =
- getApplicationAttempt(rmContainer.getApplicationAttemptId());
- if (null != application) {
- application.nodePartitionUpdated(rmContainer, oldPartition,
- newPartition);
- } else {
- LOG.warn("There's something wrong, some RMContainers running on"
- + " a node, but we cannot find SchedulerApplicationAttempt for it. Node="
- + node.getNodeID() + " applicationAttemptId="
- + rmContainer.getApplicationAttemptId());
- continue;
+ try {
+ writeLock.lock();
+ FiCaSchedulerNode node = nodeTracker.getNode(nodeId);
+ if (null == node) {
+ return;
}
+
+ // Get new partition, we have only one partition per node
+ String newPartition;
+ if (newLabels.isEmpty()) {
+ newPartition = RMNodeLabelsManager.NO_LABEL;
+ } else{
+ newPartition = newLabels.iterator().next();
+ }
+
+ // old partition as well
+ String oldPartition = node.getPartition();
+
+ // Update resources of these containers
+ for (RMContainer rmContainer : node.getCopiedListOfRunningContainers()) {
+ FiCaSchedulerApp application = getApplicationAttempt(
+ rmContainer.getApplicationAttemptId());
+ if (null != application) {
+ application.nodePartitionUpdated(rmContainer, oldPartition,
+ newPartition);
+ } else{
+ LOG.warn("There's something wrong, some RMContainers running on"
+ + " a node, but we cannot find SchedulerApplicationAttempt "
+ + "for it. Node=" + node.getNodeID() + " applicationAttemptId="
+ + rmContainer.getApplicationAttemptId());
+ continue;
+ }
+ }
+
+ // Unreserve container on this node
+ RMContainer reservedContainer = node.getReservedContainer();
+ if (null != reservedContainer) {
+ killReservedContainer(reservedContainer);
+ }
+
+ // Update node labels after we've done this
+ node.updateLabels(newLabels);
+ } finally {
+ writeLock.unlock();
}
-
- // Unreserve container on this node
- RMContainer reservedContainer = node.getReservedContainer();
- if (null != reservedContainer) {
- killReservedContainer(reservedContainer);
- }
-
- // Update node labels after we've done this
- node.updateLabels(newLabels);
}
private void updateSchedulerHealth(long now, FiCaSchedulerNode node,
@@ -1218,134 +1286,134 @@ public class CapacityScheduler extends
}
@VisibleForTesting
- public synchronized void allocateContainersToNode(FiCaSchedulerNode node) {
- if (rmContext.isWorkPreservingRecoveryEnabled()
- && !rmContext.isSchedulerReadyForAllocatingContainers()) {
- return;
- }
-
- if (!nodeTracker.exists(node.getNodeID())) {
- LOG.info("Skipping scheduling as the node " + node.getNodeID() +
- " has been removed");
- return;
- }
-
- // reset allocation and reservation stats before we start doing any work
- updateSchedulerHealth(lastNodeUpdateTime, node,
- new CSAssignment(Resources.none(), NodeType.NODE_LOCAL));
-
- CSAssignment assignment;
-
- // Assign new containers...
- // 1. Check for reserved applications
- // 2. Schedule if there are no reservations
-
- RMContainer reservedContainer = node.getReservedContainer();
- if (reservedContainer != null) {
-
- FiCaSchedulerApp reservedApplication =
- getCurrentAttemptForContainer(reservedContainer.getContainerId());
-
- // Try to fulfill the reservation
- LOG.info("Trying to fulfill reservation for application "
- + reservedApplication.getApplicationId() + " on node: "
- + node.getNodeID());
-
- LeafQueue queue = ((LeafQueue) reservedApplication.getQueue());
- assignment =
- queue.assignContainers(
- getClusterResource(),
- node,
- // TODO, now we only consider limits for parent for non-labeled
- // resources, should consider labeled resources as well.
- new ResourceLimits(labelManager.getResourceByLabel(
- RMNodeLabelsManager.NO_LABEL, getClusterResource())),
- SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY);
- if (assignment.isFulfilledReservation()) {
- CSAssignment tmp =
- new CSAssignment(reservedContainer.getReservedResource(),
- assignment.getType());
- Resources.addTo(assignment.getAssignmentInformation().getAllocated(),
- reservedContainer.getReservedResource());
- tmp.getAssignmentInformation().addAllocationDetails(
- reservedContainer.getContainerId(), queue.getQueuePath());
- tmp.getAssignmentInformation().incrAllocations();
- updateSchedulerHealth(lastNodeUpdateTime, node, tmp);
- schedulerHealth.updateSchedulerFulfilledReservationCounts(1);
-
- ActivitiesLogger.QUEUE.recordQueueActivity(activitiesManager, node,
- queue.getParent().getQueueName(), queue.getQueueName(),
- ActivityState.ACCEPTED, ActivityDiagnosticConstant.EMPTY);
- ActivitiesLogger.NODE.finishAllocatedNodeAllocation(activitiesManager,
- node, reservedContainer.getContainerId(),
- AllocationState.ALLOCATED_FROM_RESERVED);
- } else {
- ActivitiesLogger.QUEUE.recordQueueActivity(activitiesManager, node,
- queue.getParent().getQueueName(), queue.getQueueName(),
- ActivityState.ACCEPTED, ActivityDiagnosticConstant.EMPTY);
- ActivitiesLogger.NODE.finishAllocatedNodeAllocation(activitiesManager,
- node, reservedContainer.getContainerId(), AllocationState.SKIPPED);
+ public void allocateContainersToNode(FiCaSchedulerNode node) {
+ try {
+ writeLock.lock();
+ if (rmContext.isWorkPreservingRecoveryEnabled() && !rmContext
+ .isSchedulerReadyForAllocatingContainers()) {
+ return;
}
- }
- // Try to schedule more if there are no reservations to fulfill
- if (node.getReservedContainer() == null) {
- if (calculator.computeAvailableContainers(Resources
- .add(node.getUnallocatedResource(), node.getTotalKillableResources()),
- minimumAllocation) > 0) {
+ if (!nodeTracker.exists(node.getNodeID())) {
+ LOG.info("Skipping scheduling as the node " + node.getNodeID()
+ + " has been removed");
+ return;
+ }
- if (LOG.isDebugEnabled()) {
- LOG.debug("Trying to schedule on node: " + node.getNodeName() +
- ", available: " + node.getUnallocatedResource());
- }
+ // reset allocation and reservation stats before we start doing any work
+ updateSchedulerHealth(lastNodeUpdateTime, node,
+ new CSAssignment(Resources.none(), NodeType.NODE_LOCAL));
- assignment = root.assignContainers(
- getClusterResource(),
- node,
- new ResourceLimits(labelManager.getResourceByLabel(
- node.getPartition(), getClusterResource())),
- SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY);
- if (Resources.greaterThan(calculator, getClusterResource(),
- assignment.getResource(), Resources.none())) {
- updateSchedulerHealth(lastNodeUpdateTime, node, assignment);
- return;
- }
-
- // Only do non-exclusive allocation when node has node-labels.
- if (StringUtils.equals(node.getPartition(),
- RMNodeLabelsManager.NO_LABEL)) {
- return;
- }
-
- // Only do non-exclusive allocation when the node-label supports that
- try {
- if (rmContext.getNodeLabelManager().isExclusiveNodeLabel(
- node.getPartition())) {
- return;
- }
- } catch (IOException e) {
- LOG.warn("Exception when trying to get exclusivity of node label="
- + node.getPartition(), e);
- return;
- }
-
- // Try to use NON_EXCLUSIVE
- assignment = root.assignContainers(
- getClusterResource(),
- node,
+ CSAssignment assignment;
+
+ // Assign new containers...
+ // 1. Check for reserved applications
+ // 2. Schedule if there are no reservations
+
+ RMContainer reservedContainer = node.getReservedContainer();
+ if (reservedContainer != null) {
+
+ FiCaSchedulerApp reservedApplication = getCurrentAttemptForContainer(
+ reservedContainer.getContainerId());
+
+ // Try to fulfill the reservation
+ LOG.info("Trying to fulfill reservation for application "
+ + reservedApplication.getApplicationId() + " on node: " + node
+ .getNodeID());
+
+ LeafQueue queue = ((LeafQueue) reservedApplication.getQueue());
+ assignment = queue.assignContainers(getClusterResource(), node,
// TODO, now we only consider limits for parent for non-labeled
// resources, should consider labeled resources as well.
- new ResourceLimits(labelManager.getResourceByLabel(
- RMNodeLabelsManager.NO_LABEL, getClusterResource())),
- SchedulingMode.IGNORE_PARTITION_EXCLUSIVITY);
- updateSchedulerHealth(lastNodeUpdateTime, node, assignment);
+ new ResourceLimits(labelManager
+ .getResourceByLabel(RMNodeLabelsManager.NO_LABEL,
+ getClusterResource())),
+ SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY);
+ if (assignment.isFulfilledReservation()) {
+ CSAssignment tmp = new CSAssignment(
+ reservedContainer.getReservedResource(), assignment.getType());
+ Resources.addTo(assignment.getAssignmentInformation().getAllocated(),
+ reservedContainer.getReservedResource());
+ tmp.getAssignmentInformation().addAllocationDetails(
+ reservedContainer.getContainerId(), queue.getQueuePath());
+ tmp.getAssignmentInformation().incrAllocations();
+ updateSchedulerHealth(lastNodeUpdateTime, node, tmp);
+ schedulerHealth.updateSchedulerFulfilledReservationCounts(1);
+
+ ActivitiesLogger.QUEUE.recordQueueActivity(activitiesManager, node,
+ queue.getParent().getQueueName(), queue.getQueueName(),
+ ActivityState.ACCEPTED, ActivityDiagnosticConstant.EMPTY);
+ ActivitiesLogger.NODE.finishAllocatedNodeAllocation(activitiesManager,
+ node, reservedContainer.getContainerId(),
+ AllocationState.ALLOCATED_FROM_RESERVED);
+ } else{
+ ActivitiesLogger.QUEUE.recordQueueActivity(activitiesManager, node,
+ queue.getParent().getQueueName(), queue.getQueueName(),
+ ActivityState.ACCEPTED, ActivityDiagnosticConstant.EMPTY);
+ ActivitiesLogger.NODE.finishAllocatedNodeAllocation(activitiesManager,
+ node, reservedContainer.getContainerId(),
+ AllocationState.SKIPPED);
+ }
}
- } else {
- LOG.info("Skipping scheduling since node "
- + node.getNodeID()
- + " is reserved by application "
- + node.getReservedContainer().getContainerId()
- .getApplicationAttemptId());
+
+ // Try to schedule more if there are no reservations to fulfill
+ if (node.getReservedContainer() == null) {
+ if (calculator.computeAvailableContainers(Resources
+ .add(node.getUnallocatedResource(),
+ node.getTotalKillableResources()), minimumAllocation) > 0) {
+
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Trying to schedule on node: " + node.getNodeName()
+ + ", available: " + node.getUnallocatedResource());
+ }
+
+ assignment = root.assignContainers(getClusterResource(), node,
+ new ResourceLimits(labelManager
+ .getResourceByLabel(node.getPartition(),
+ getClusterResource())),
+ SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY);
+ if (Resources.greaterThan(calculator, getClusterResource(),
+ assignment.getResource(), Resources.none())) {
+ updateSchedulerHealth(lastNodeUpdateTime, node, assignment);
+ return;
+ }
+
+ // Only do non-exclusive allocation when node has node-labels.
+ if (StringUtils.equals(node.getPartition(),
+ RMNodeLabelsManager.NO_LABEL)) {
+ return;
+ }
+
+ // Only do non-exclusive allocation when the node-label supports that
+ try {
+ if (rmContext.getNodeLabelManager().isExclusiveNodeLabel(
+ node.getPartition())) {
+ return;
+ }
+ } catch (IOException e) {
+ LOG.warn(
+ "Exception when trying to get exclusivity of node label=" + node
+ .getPartition(), e);
+ return;
+ }
+
+ // Try to use NON_EXCLUSIVE
+ assignment = root.assignContainers(getClusterResource(), node,
+ // TODO, now we only consider limits for parent for non-labeled
+ // resources, should consider labeled resources as well.
+ new ResourceLimits(labelManager
+ .getResourceByLabel(RMNodeLabelsManager.NO_LABEL,
+ getClusterResource())),
+ SchedulingMode.IGNORE_PARTITION_EXCLUSIVITY);
+ updateSchedulerHealth(lastNodeUpdateTime, node, assignment);
+ }
+ } else{
+ LOG.info("Skipping scheduling since node " + node.getNodeID()
+ + " is reserved by application " + node.getReservedContainer()
+ .getContainerId().getApplicationAttemptId());
+ }
+ } finally {
+ writeLock.unlock();
}
}
@@ -1498,100 +1566,108 @@ public class CapacityScheduler extends
}
}
- private synchronized void addNode(RMNode nodeManager) {
- FiCaSchedulerNode schedulerNode = new FiCaSchedulerNode(nodeManager,
- usePortForNodeName, nodeManager.getNodeLabels());
- nodeTracker.addNode(schedulerNode);
+ private void addNode(RMNode nodeManager) {
+ try {
+ writeLock.lock();
+ FiCaSchedulerNode schedulerNode = new FiCaSchedulerNode(nodeManager,
+ usePortForNodeName, nodeManager.getNodeLabels());
+ nodeTracker.addNode(schedulerNode);
- // update this node to node label manager
- if (labelManager != null) {
- labelManager.activateNode(nodeManager.getNodeID(),
- schedulerNode.getTotalResource());
- }
+ // update this node to node label manager
+ if (labelManager != null) {
+ labelManager.activateNode(nodeManager.getNodeID(),
+ schedulerNode.getTotalResource());
+ }
- Resource clusterResource = getClusterResource();
- root.updateClusterResource(clusterResource, new ResourceLimits(
- clusterResource));
+ Resource clusterResource = getClusterResource();
+ root.updateClusterResource(clusterResource,
+ new ResourceLimits(clusterResource));
- LOG.info("Added node " + nodeManager.getNodeAddress() +
- " clusterResource: " + clusterResource);
+ LOG.info(
+ "Added node " + nodeManager.getNodeAddress() + " clusterResource: "
+ + clusterResource);
- if (scheduleAsynchronously && getNumClusterNodes() == 1) {
- asyncSchedulerThread.beginSchedule();
+ if (scheduleAsynchronously && getNumClusterNodes() == 1) {
+ asyncSchedulerThread.beginSchedule();
+ }
+ } finally {
+ writeLock.unlock();
}
}
- private synchronized void removeNode(RMNode nodeInfo) {
- // update this node to node label manager
- if (labelManager != null) {
- labelManager.deactivateNode(nodeInfo.getNodeID());
- }
+ private void removeNode(RMNode nodeInfo) {
+ try {
+ writeLock.lock();
+ // update this node to node label manager
+ if (labelManager != null) {
+ labelManager.deactivateNode(nodeInfo.getNodeID());
+ }
- NodeId nodeId = nodeInfo.getNodeID();
- FiCaSchedulerNode node = nodeTracker.getNode(nodeId);
- if (node == null) {
- LOG.error("Attempting to remove non-existent node " + nodeId);
- return;
- }
+ NodeId nodeId = nodeInfo.getNodeID();
+ FiCaSchedulerNode node = nodeTracker.getNode(nodeId);
+ if (node == null) {
+ LOG.error("Attempting to remove non-existent node " + nodeId);
+ return;
+ }
- // Remove running containers
- List runningContainers = node.getCopiedListOfRunningContainers();
- for (RMContainer container : runningContainers) {
- super.completedContainer(container,
- SchedulerUtils.createAbnormalContainerStatus(
- container.getContainerId(),
- SchedulerUtils.LOST_CONTAINER),
- RMContainerEventType.KILL);
- }
-
- // Remove reservations, if any
- RMContainer reservedContainer = node.getReservedContainer();
- if (reservedContainer != null) {
- super.completedContainer(reservedContainer,
- SchedulerUtils.createAbnormalContainerStatus(
- reservedContainer.getContainerId(),
- SchedulerUtils.LOST_CONTAINER),
- RMContainerEventType.KILL);
- }
+ // Remove running containers
+ List runningContainers =
+ node.getCopiedListOfRunningContainers();
+ for (RMContainer container : runningContainers) {
+ super.completedContainer(container, SchedulerUtils
+ .createAbnormalContainerStatus(container.getContainerId(),
+ SchedulerUtils.LOST_CONTAINER), RMContainerEventType.KILL);
+ }
- nodeTracker.removeNode(nodeId);
- Resource clusterResource = getClusterResource();
- root.updateClusterResource(clusterResource, new ResourceLimits(
- clusterResource));
- int numNodes = nodeTracker.nodeCount();
+ // Remove reservations, if any
+ RMContainer reservedContainer = node.getReservedContainer();
+ if (reservedContainer != null) {
+ super.completedContainer(reservedContainer, SchedulerUtils
+ .createAbnormalContainerStatus(reservedContainer.getContainerId(),
+ SchedulerUtils.LOST_CONTAINER), RMContainerEventType.KILL);
+ }
- if (scheduleAsynchronously && numNodes == 0) {
- asyncSchedulerThread.suspendSchedule();
+ nodeTracker.removeNode(nodeId);
+ Resource clusterResource = getClusterResource();
+ root.updateClusterResource(clusterResource,
+ new ResourceLimits(clusterResource));
+ int numNodes = nodeTracker.nodeCount();
+
+ if (scheduleAsynchronously && numNodes == 0) {
+ asyncSchedulerThread.suspendSchedule();
+ }
+
+ LOG.info(
+ "Removed node " + nodeInfo.getNodeAddress() + " clusterResource: "
+ + getClusterResource());
+ } finally {
+ writeLock.unlock();
}
-
- LOG.info("Removed node " + nodeInfo.getNodeAddress() +
- " clusterResource: " + getClusterResource());
}
private void rollbackContainerResource(
ContainerId containerId) {
RMContainer rmContainer = getRMContainer(containerId);
if (rmContainer == null) {
- LOG.info("Cannot rollback resource for container " + containerId +
- ". The container does not exist.");
+ LOG.info("Cannot rollback resource for container " + containerId
+ + ". The container does not exist.");
return;
}
FiCaSchedulerApp application = getCurrentAttemptForContainer(containerId);
if (application == null) {
- LOG.info("Cannot rollback resource for container " + containerId +
- ". The application that the container belongs to does not exist.");
+ LOG.info("Cannot rollback resource for container " + containerId
+ + ". The application that the container "
+ + "belongs to does not exist.");
return;
}
LOG.info("Roll back resource for container " + containerId);
- LeafQueue leafQueue = (LeafQueue) application.getQueue();
- synchronized(leafQueue) {
- SchedulerNode schedulerNode =
- getSchedulerNode(rmContainer.getAllocatedNode());
- SchedContainerChangeRequest decreaseRequest =
- new SchedContainerChangeRequest(this.rmContext, schedulerNode,
- rmContainer, rmContainer.getLastConfirmedResource());
- decreaseContainer(decreaseRequest, application);
- }
+
+ SchedulerNode schedulerNode = getSchedulerNode(
+ rmContainer.getAllocatedNode());
+ SchedContainerChangeRequest decreaseRequest =
+ new SchedContainerChangeRequest(this.rmContext, schedulerNode,
+ rmContainer, rmContainer.getLastConfirmedResource());
+ decreaseContainer(decreaseRequest, application);
}
@Override
@@ -1600,23 +1676,29 @@ public class CapacityScheduler extends
RMContainerEventType event) {
Container container = rmContainer.getContainer();
ContainerId containerId = container.getId();
-
+
// Get the application for the finished container
- FiCaSchedulerApp application =
- getCurrentAttemptForContainer(container.getId());
+ FiCaSchedulerApp application = getCurrentAttemptForContainer(
+ container.getId());
ApplicationId appId =
containerId.getApplicationAttemptId().getApplicationId();
if (application == null) {
- LOG.info("Container " + container + " of" + " finished application "
- + appId + " completed with event " + event);
+ LOG.info(
+ "Container " + container + " of" + " finished application " + appId
+ + " completed with event " + event);
return;
}
-
+
// Get the node on which the container was allocated
FiCaSchedulerNode node = getNode(container.getNodeId());
-
+ if (null == node) {
+ LOG.info("Container " + container + " of" + " removed node " + container
+ .getNodeId() + " completed with event " + event);
+ return;
+ }
+
// Inform the queue
- LeafQueue queue = (LeafQueue)application.getQueue();
+ LeafQueue queue = (LeafQueue) application.getQueue();
queue.completedContainer(getClusterResource(), application, node,
rmContainer, containerStatus, event, null, true);
}
@@ -1627,19 +1709,19 @@ public class CapacityScheduler extends
RMContainer rmContainer = decreaseRequest.getRMContainer();
// Check container status before doing decrease
if (rmContainer.getState() != RMContainerState.RUNNING) {
- LOG.info("Trying to decrease a container not in RUNNING state, container="
- + rmContainer + " state=" + rmContainer.getState().name());
+ LOG.info(
+ "Trying to decrease a container not in RUNNING state, container="
+ + rmContainer + " state=" + rmContainer.getState().name());
return;
}
- FiCaSchedulerApp app = (FiCaSchedulerApp)attempt;
+ FiCaSchedulerApp app = (FiCaSchedulerApp) attempt;
LeafQueue queue = (LeafQueue) attempt.getQueue();
try {
queue.decreaseContainer(getClusterResource(), decreaseRequest, app);
// Notify RMNode that the container can be pulled by NodeManager in the
// next heartbeat
- this.rmContext.getDispatcher().getEventHandler()
- .handle(new RMNodeDecreaseContainerEvent(
- decreaseRequest.getNodeId(),
+ this.rmContext.getDispatcher().getEventHandler().handle(
+ new RMNodeDecreaseContainerEvent(decreaseRequest.getNodeId(),
Collections.singletonList(rmContainer.getContainer())));
} catch (InvalidResourceRequestException e) {
LOG.warn("Error happens when checking decrease request, Ignoring.."
@@ -1700,70 +1782,81 @@ public class CapacityScheduler extends
}
}
- public synchronized void markContainerForKillable(
+ public void markContainerForKillable(
RMContainer killableContainer) {
- if (LOG.isDebugEnabled()) {
- LOG.debug(SchedulerEventType.MARK_CONTAINER_FOR_KILLABLE + ": container"
- + killableContainer.toString());
- }
+ try {
+ writeLock.lock();
+ if (LOG.isDebugEnabled()) {
+ LOG.debug(SchedulerEventType.MARK_CONTAINER_FOR_KILLABLE + ": container"
+ + killableContainer.toString());
+ }
+
+ if (!isLazyPreemptionEnabled) {
+ super.completedContainer(killableContainer, SchedulerUtils
+ .createPreemptedContainerStatus(killableContainer.getContainerId(),
+ SchedulerUtils.PREEMPTED_CONTAINER), RMContainerEventType.KILL);
+ } else{
+ FiCaSchedulerNode node = (FiCaSchedulerNode) getSchedulerNode(
+ killableContainer.getAllocatedNode());
+
+ FiCaSchedulerApp application = getCurrentAttemptForContainer(
+ killableContainer.getContainerId());
+
+ node.markContainerToKillable(killableContainer.getContainerId());
+
+ // notify PreemptionManager
+ // Get the application for the finished container
+ if (null != application) {
+ String leafQueueName = application.getCSLeafQueue().getQueueName();
+ getPreemptionManager().addKillableContainer(
+ new KillableContainer(killableContainer, node.getPartition(),
+ leafQueueName));
+ }
+ }
+ } finally {
+ writeLock.unlock();
+ }
+ }
+
+ private void markContainerForNonKillable(
+ RMContainer nonKillableContainer) {
+ try {
+ writeLock.lock();
+ if (LOG.isDebugEnabled()) {
+ LOG.debug(
+ SchedulerEventType.MARK_CONTAINER_FOR_NONKILLABLE + ": container"
+ + nonKillableContainer.toString());
+ }
- if (!isLazyPreemptionEnabled) {
- super.completedContainer(killableContainer, SchedulerUtils
- .createPreemptedContainerStatus(killableContainer.getContainerId(),
- SchedulerUtils.PREEMPTED_CONTAINER), RMContainerEventType.KILL);
- } else {
FiCaSchedulerNode node = (FiCaSchedulerNode) getSchedulerNode(
- killableContainer.getAllocatedNode());
+ nonKillableContainer.getAllocatedNode());
FiCaSchedulerApp application = getCurrentAttemptForContainer(
- killableContainer.getContainerId());
+ nonKillableContainer.getContainerId());
- node.markContainerToKillable(killableContainer.getContainerId());
+ node.markContainerToNonKillable(nonKillableContainer.getContainerId());
// notify PreemptionManager
// Get the application for the finished container
if (null != application) {
String leafQueueName = application.getCSLeafQueue().getQueueName();
- getPreemptionManager().addKillableContainer(
- new KillableContainer(killableContainer, node.getPartition(),
+ getPreemptionManager().removeKillableContainer(
+ new KillableContainer(nonKillableContainer, node.getPartition(),
leafQueueName));
- } }
- }
-
- private synchronized void markContainerForNonKillable(
- RMContainer nonKillableContainer) {
- if (LOG.isDebugEnabled()) {
- LOG.debug(
- SchedulerEventType.MARK_CONTAINER_FOR_NONKILLABLE + ": container"
- + nonKillableContainer.toString());
- }
-
- FiCaSchedulerNode node = (FiCaSchedulerNode) getSchedulerNode(
- nonKillableContainer.getAllocatedNode());
-
- FiCaSchedulerApp application = getCurrentAttemptForContainer(
- nonKillableContainer.getContainerId());
-
- node.markContainerToNonKillable(nonKillableContainer.getContainerId());
-
- // notify PreemptionManager
- // Get the application for the finished container
- if (null != application) {
- String leafQueueName = application.getCSLeafQueue().getQueueName();
- getPreemptionManager().removeKillableContainer(
- new KillableContainer(nonKillableContainer, node.getPartition(),
- leafQueueName));
+ }
+ } finally {
+ writeLock.unlock();
}
}
@Override
- public synchronized boolean checkAccess(UserGroupInformation callerUGI,
+ public boolean checkAccess(UserGroupInformation callerUGI,
QueueACL acl, String queueName) {
CSQueue queue = getQueue(queueName);
if (queue == null) {
if (LOG.isDebugEnabled()) {
- LOG.debug("ACL not found for queue access-type " + acl
- + " for queue " + queueName);
+ LOG.debug("ACL not found for queue access-type " + acl + " for queue "
+ + queueName);
}
return false;
}
@@ -1802,181 +1895,211 @@ public class CapacityScheduler extends
return planQueueName + ReservationConstants.DEFAULT_QUEUE_SUFFIX;
}
- private synchronized String resolveReservationQueueName(String queueName,
+ private String resolveReservationQueueName(String queueName,
ApplicationId applicationId, ReservationId reservationID,
boolean isRecovering) {
- CSQueue queue = getQueue(queueName);
- // Check if the queue is a plan queue
- if ((queue == null) || !(queue instanceof PlanQueue)) {
- return queueName;
- }
- if (reservationID != null) {
- String resQName = reservationID.toString();
- queue = getQueue(resQName);
- if (queue == null) {
- // reservation has terminated during failover
- if (isRecovering
- && conf.getMoveOnExpiry(getQueue(queueName).getQueuePath())) {
- // move to the default child queue of the plan
- return getDefaultReservationQueueName(queueName);
- }
- String message =
- "Application " + applicationId
- + " submitted to a reservation which is not currently active: "
- + resQName;
- this.rmContext.getDispatcher().getEventHandler()
- .handle(new RMAppEvent(applicationId,
- RMAppEventType.APP_REJECTED, message));
- return null;
- }
- if (!queue.getParent().getQueueName().equals(queueName)) {
- String message =
- "Application: " + applicationId + " submitted to a reservation "
- + resQName + " which does not belong to the specified queue: "
- + queueName;
- this.rmContext.getDispatcher().getEventHandler()
- .handle(new RMAppEvent(applicationId,
- RMAppEventType.APP_REJECTED, message));
- return null;
- }
- // use the reservation queue to run the app
- queueName = resQName;
- } else {
- // use the default child queue of the plan for unreserved apps
- queueName = getDefaultReservationQueueName(queueName);
- }
- return queueName;
- }
-
- @Override
- public synchronized void removeQueue(String queueName)
- throws SchedulerDynamicEditException {
- LOG.info("Removing queue: " + queueName);
- CSQueue q = this.getQueue(queueName);
- if (!(q instanceof ReservationQueue)) {
- throw new SchedulerDynamicEditException("The queue that we are asked "
- + "to remove (" + queueName + ") is not a ReservationQueue");
- }
- ReservationQueue disposableLeafQueue = (ReservationQueue) q;
- // at this point we should have no more apps
- if (disposableLeafQueue.getNumApplications() > 0) {
- throw new SchedulerDynamicEditException("The queue " + queueName
- + " is not empty " + disposableLeafQueue.getApplications().size()
- + " active apps " + disposableLeafQueue.getPendingApplications().size()
- + " pending apps");
- }
-
- ((PlanQueue) disposableLeafQueue.getParent()).removeChildQueue(q);
- this.queues.remove(queueName);
- LOG.info("Removal of ReservationQueue " + queueName + " has succeeded");
- }
-
- @Override
- public synchronized void addQueue(Queue queue)
- throws SchedulerDynamicEditException {
-
- if (!(queue instanceof ReservationQueue)) {
- throw new SchedulerDynamicEditException("Queue " + queue.getQueueName()
- + " is not a ReservationQueue");
- }
-
- ReservationQueue newQueue = (ReservationQueue) queue;
-
- if (newQueue.getParent() == null
- || !(newQueue.getParent() instanceof PlanQueue)) {
- throw new SchedulerDynamicEditException("ParentQueue for "
- + newQueue.getQueueName()
- + " is not properly set (should be set and be a PlanQueue)");
- }
-
- PlanQueue parentPlan = (PlanQueue) newQueue.getParent();
- String queuename = newQueue.getQueueName();
- parentPlan.addChildQueue(newQueue);
- this.queues.put(queuename, newQueue);
- LOG.info("Creation of ReservationQueue " + newQueue + " succeeded");
- }
-
- @Override
- public synchronized void setEntitlement(String inQueue,
- QueueEntitlement entitlement) throws SchedulerDynamicEditException,
- YarnException {
- LeafQueue queue = getAndCheckLeafQueue(inQueue);
- ParentQueue parent = (ParentQueue) queue.getParent();
-
- if (!(queue instanceof ReservationQueue)) {
- throw new SchedulerDynamicEditException("Entitlement can not be"
- + " modified dynamically since queue " + inQueue
- + " is not a ReservationQueue");
- }
-
- if (!(parent instanceof PlanQueue)) {
- throw new SchedulerDynamicEditException("The parent of ReservationQueue "
- + inQueue + " must be an PlanQueue");
- }
-
- ReservationQueue newQueue = (ReservationQueue) queue;
-
- float sumChilds = ((PlanQueue) parent).sumOfChildCapacities();
- float newChildCap = sumChilds - queue.getCapacity() + entitlement.getCapacity();
-
- if (newChildCap >= 0 && newChildCap < 1.0f + CSQueueUtils.EPSILON) {
- // note: epsilon checks here are not ok, as the epsilons might accumulate
- // and become a problem in aggregate
- if (Math.abs(entitlement.getCapacity() - queue.getCapacity()) == 0
- && Math.abs(entitlement.getMaxCapacity() - queue.getMaximumCapacity()) == 0) {
- return;
- }
- newQueue.setEntitlement(entitlement);
- } else {
- throw new SchedulerDynamicEditException(
- "Sum of child queues would exceed 100% for PlanQueue: "
- + parent.getQueueName());
- }
- LOG.info("Set entitlement for ReservationQueue " + inQueue + " to "
- + queue.getCapacity() + " request was (" + entitlement.getCapacity() + ")");
- }
-
- @Override
- public synchronized String moveApplication(ApplicationId appId,
- String targetQueueName) throws YarnException {
- FiCaSchedulerApp app =
- getApplicationAttempt(ApplicationAttemptId.newInstance(appId, 0));
- String sourceQueueName = app.getQueue().getQueueName();
- LeafQueue source = getAndCheckLeafQueue(sourceQueueName);
- String destQueueName = handleMoveToPlanQueue(targetQueueName);
- LeafQueue dest = getAndCheckLeafQueue(destQueueName);
- // Validation check - ACLs, submission limits for user & queue
- String user = app.getUser();
- checkQueuePartition(app, dest);
try {
- dest.submitApplication(appId, user, destQueueName);
- } catch (AccessControlException e) {
- throw new YarnException(e);
+ readLock.lock();
+ CSQueue queue = getQueue(queueName);
+ // Check if the queue is a plan queue
+ if ((queue == null) || !(queue instanceof PlanQueue)) {
+ return queueName;
+ }
+ if (reservationID != null) {
+ String resQName = reservationID.toString();
+ queue = getQueue(resQName);
+ if (queue == null) {
+ // reservation has terminated during failover
+ if (isRecovering && conf.getMoveOnExpiry(
+ getQueue(queueName).getQueuePath())) {
+ // move to the default child queue of the plan
+ return getDefaultReservationQueueName(queueName);
+ }
+ String message = "Application " + applicationId
+ + " submitted to a reservation which is not currently active: "
+ + resQName;
+ this.rmContext.getDispatcher().getEventHandler().handle(
+ new RMAppEvent(applicationId, RMAppEventType.APP_REJECTED,
+ message));
+ return null;
+ }
+ if (!queue.getParent().getQueueName().equals(queueName)) {
+ String message =
+ "Application: " + applicationId + " submitted to a reservation "
+ + resQName + " which does not belong to the specified queue: "
+ + queueName;
+ this.rmContext.getDispatcher().getEventHandler().handle(
+ new RMAppEvent(applicationId, RMAppEventType.APP_REJECTED,
+ message));
+ return null;
+ }
+ // use the reservation queue to run the app
+ queueName = resQName;
+ } else{
+ // use the default child queue of the plan for unreserved apps
+ queueName = getDefaultReservationQueueName(queueName);
+ }
+ return queueName;
+ } finally {
+ readLock.unlock();
}
- // Move all live containers
- for (RMContainer rmContainer : app.getLiveContainers()) {
- source.detachContainer(getClusterResource(), app, rmContainer);
- // attach the Container to another queue
- dest.attachContainer(getClusterResource(), app, rmContainer);
+
+ }
+
+ @Override
+ public void removeQueue(String queueName)
+ throws SchedulerDynamicEditException {
+ try {
+ writeLock.lock();
+ LOG.info("Removing queue: " + queueName);
+ CSQueue q = this.getQueue(queueName);
+ if (!(q instanceof ReservationQueue)) {
+ throw new SchedulerDynamicEditException(
+ "The queue that we are asked " + "to remove (" + queueName
+ + ") is not a ReservationQueue");
+ }
+ ReservationQueue disposableLeafQueue = (ReservationQueue) q;
+ // at this point we should have no more apps
+ if (disposableLeafQueue.getNumApplications() > 0) {
+ throw new SchedulerDynamicEditException(
+ "The queue " + queueName + " is not empty " + disposableLeafQueue
+ .getApplications().size() + " active apps "
+ + disposableLeafQueue.getPendingApplications().size()
+ + " pending apps");
+ }
+
+ ((PlanQueue) disposableLeafQueue.getParent()).removeChildQueue(q);
+ this.queues.remove(queueName);
+ LOG.info("Removal of ReservationQueue " + queueName + " has succeeded");
+ } finally {
+ writeLock.unlock();
+ }
+ }
+
+ @Override
+ public void addQueue(Queue queue)
+ throws SchedulerDynamicEditException {
+ try {
+ writeLock.lock();
+ if (!(queue instanceof ReservationQueue)) {
+ throw new SchedulerDynamicEditException(
+ "Queue " + queue.getQueueName() + " is not a ReservationQueue");
+ }
+
+ ReservationQueue newQueue = (ReservationQueue) queue;
+
+ if (newQueue.getParent() == null || !(newQueue
+ .getParent() instanceof PlanQueue)) {
+ throw new SchedulerDynamicEditException(
+ "ParentQueue for " + newQueue.getQueueName()
+ + " is not properly set (should be set and be a PlanQueue)");
+ }
+
+ PlanQueue parentPlan = (PlanQueue) newQueue.getParent();
+ String queuename = newQueue.getQueueName();
+ parentPlan.addChildQueue(newQueue);
+ this.queues.put(queuename, newQueue);
+ LOG.info("Creation of ReservationQueue " + newQueue + " succeeded");
+ } finally {
+ writeLock.unlock();
+ }
+ }
+
+ @Override
+ public void setEntitlement(String inQueue, QueueEntitlement entitlement)
+ throws YarnException {
+ try {
+ writeLock.lock();
+ LeafQueue queue = getAndCheckLeafQueue(inQueue);
+ ParentQueue parent = (ParentQueue) queue.getParent();
+
+ if (!(queue instanceof ReservationQueue)) {
+ throw new SchedulerDynamicEditException(
+ "Entitlement can not be" + " modified dynamically since queue "
+ + inQueue + " is not a ReservationQueue");
+ }
+
+ if (!(parent instanceof PlanQueue)) {
+ throw new SchedulerDynamicEditException(
+ "The parent of ReservationQueue " + inQueue
+ + " must be an PlanQueue");
+ }
+
+ ReservationQueue newQueue = (ReservationQueue) queue;
+
+ float sumChilds = ((PlanQueue) parent).sumOfChildCapacities();
+ float newChildCap =
+ sumChilds - queue.getCapacity() + entitlement.getCapacity();
+
+ if (newChildCap >= 0 && newChildCap < 1.0f + CSQueueUtils.EPSILON) {
+ // note: epsilon checks here are not ok, as the epsilons might
+ // accumulate and become a problem in aggregate
+ if (Math.abs(entitlement.getCapacity() - queue.getCapacity()) == 0
+ && Math.abs(
+ entitlement.getMaxCapacity() - queue.getMaximumCapacity()) == 0) {
+ return;
+ }
+ newQueue.setEntitlement(entitlement);
+ } else{
+ throw new SchedulerDynamicEditException(
+ "Sum of child queues would exceed 100% for PlanQueue: " + parent
+ .getQueueName());
+ }
+ LOG.info(
+ "Set entitlement for ReservationQueue " + inQueue + " to " + queue
+ .getCapacity() + " request was (" + entitlement.getCapacity()
+ + ")");
+ } finally {
+ writeLock.unlock();
+ }
+ }
+
+ @Override
+ public String moveApplication(ApplicationId appId,
+ String targetQueueName) throws YarnException {
+ try {
+ writeLock.lock();
+ FiCaSchedulerApp app = getApplicationAttempt(
+ ApplicationAttemptId.newInstance(appId, 0));
+ String sourceQueueName = app.getQueue().getQueueName();
+ LeafQueue source = getAndCheckLeafQueue(sourceQueueName);
+ String destQueueName = handleMoveToPlanQueue(targetQueueName);
+ LeafQueue dest = getAndCheckLeafQueue(destQueueName);
+ // Validation check - ACLs, submission limits for user & queue
+ String user = app.getUser();
+ checkQueuePartition(app, dest);
+ try {
+ dest.submitApplication(appId, user, destQueueName);
+ } catch (AccessControlException e) {
+ throw new YarnException(e);
+ }
+ // Move all live containers
+ for (RMContainer rmContainer : app.getLiveContainers()) {
+ source.detachContainer(getClusterResource(), app, rmContainer);
+ // attach the Container to another queue
+ dest.attachContainer(getClusterResource(), app, rmContainer);
+ }
+ // Detach the application..
+ source.finishApplicationAttempt(app, sourceQueueName);
+ source.getParent().finishApplication(appId, app.getUser());
+ // Finish app & update metrics
+ app.move(dest);
+ // Submit to a new queue
+ dest.submitApplicationAttempt(app, user);
+ applications.get(appId).setQueue(dest);
+ LOG.info("App: " + app.getApplicationId() + " successfully moved from "
+ + sourceQueueName + " to: " + destQueueName);
+ return targetQueueName;
+ } finally {
+ writeLock.unlock();
}
- // Detach the application..
- source.finishApplicationAttempt(app, sourceQueueName);
- source.getParent().finishApplication(appId, app.getUser());
- // Finish app & update metrics
- app.move(dest);
- // Submit to a new queue
- dest.submitApplicationAttempt(app, user);
- applications.get(appId).setQueue(dest);
- LOG.info("App: " + app.getApplicationId() + " successfully moved from "
- + sourceQueueName + " to: " + destQueueName);
- return targetQueueName;
}
/**
* Check application can be moved to queue with labels enabled. All labels in
* application life time will be checked
*
- * @param appId
+ * @param app
* @param dest
* @throws YarnException
*/
@@ -2166,16 +2289,8 @@ public class CapacityScheduler extends
// As we use iterator over a TreeSet for OrderingPolicy, once we change
// priority then reinsert back to make order correct.
LeafQueue queue = (LeafQueue) getQueue(rmApp.getQueue());
- synchronized (queue) {
- queue.getOrderingPolicy().removeSchedulableEntity(
- application.getCurrentAppAttempt());
- // Update new priority in SchedulerApplication
- application.setPriority(appPriority);
-
- queue.getOrderingPolicy().addSchedulableEntity(
- application.getCurrentAppAttempt());
- }
+ queue.updateApplicationPriority(application, appPriority);
// Update the changed application state to timeline server
rmContext.getSystemMetricsPublisher().appUpdated(rmApp,
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java
index 6129772a5b1..eecd4ba4a3d 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java
@@ -2227,6 +2227,22 @@ public class LeafQueue extends AbstractCSQueue {
}
}
+ public void updateApplicationPriority(SchedulerApplication app,
+ Priority newAppPriority) {
+ try {
+ writeLock.lock();
+ FiCaSchedulerApp attempt = app.getCurrentAppAttempt();
+ getOrderingPolicy().removeSchedulableEntity(attempt);
+
+ // Update new priority in SchedulerApplication
+ attempt.setPriority(newAppPriority);
+
+ getOrderingPolicy().addSchedulableEntity(attempt);
+ } finally {
+ writeLock.unlock();
+ }
+ }
+
public OrderingPolicy
getPendingAppsOrderingPolicy() {
return pendingOrderingPolicy;
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerApp.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerApp.java
index fd43e748fc2..aa7ad500a49 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerApp.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerApp.java
@@ -23,6 +23,7 @@ import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
@@ -666,6 +667,9 @@ public class FiCaSchedulerApp extends SchedulerApplicationAttempt {
} finally {
writeLock.unlock();
}
+ }
+ public ReentrantReadWriteLock.WriteLock getWriteLock() {
+ return this.writeLock;
}
}
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java
index 920052f1783..8daf0f333c2 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java
@@ -186,10 +186,13 @@ public class FairScheduler extends
// an app can be reserved on
protected boolean sizeBasedWeight; // Give larger weights to larger jobs
- protected boolean continuousSchedulingEnabled; // Continuous Scheduling enabled or not
- protected int continuousSchedulingSleepMs; // Sleep time for each pass in continuous scheduling
+ // Continuous Scheduling enabled or not
+ protected boolean continuousSchedulingEnabled;
+ // Sleep time for each pass in continuous scheduling
+ protected volatile int continuousSchedulingSleepMs;
+ // Node available resource comparator
private Comparator nodeAvailableResourceComparator =
- new NodeAvailableResourceComparator(); // Node available resource comparator
+ new NodeAvailableResourceComparator();
protected double nodeLocalityThreshold; // Cluster threshold for node locality
protected double rackLocalityThreshold; // Cluster threshold for rack locality
protected long nodeLocalityDelayMs; // Delay for node locality
@@ -338,36 +341,40 @@ public class FairScheduler extends
* fair shares, deficits, minimum slot allocations, and amount of used and
* required resources per job.
*/
- protected synchronized void update() {
- long start = getClock().getTime();
- updateStarvationStats(); // Determine if any queues merit preemption
+ protected void update() {
+ try {
+ writeLock.lock();
+ long start = getClock().getTime();
+ updateStarvationStats(); // Determine if any queues merit preemption
- FSQueue rootQueue = queueMgr.getRootQueue();
+ FSQueue rootQueue = queueMgr.getRootQueue();
- // Recursively update demands for all queues
- rootQueue.updateDemand();
+ // Recursively update demands for all queues
+ rootQueue.updateDemand();
- Resource clusterResource = getClusterResource();
- rootQueue.setFairShare(clusterResource);
- // Recursively compute fair shares for all queues
- // and update metrics
- rootQueue.recomputeShares();
- updateRootQueueMetrics();
+ Resource clusterResource = getClusterResource();
+ rootQueue.setFairShare(clusterResource);
+ // Recursively compute fair shares for all queues
+ // and update metrics
+ rootQueue.recomputeShares();
+ updateRootQueueMetrics();
- if (LOG.isDebugEnabled()) {
- if (--updatesToSkipForDebug < 0) {
- updatesToSkipForDebug = UPDATE_DEBUG_FREQUENCY;
- LOG.debug("Cluster Capacity: " + clusterResource +
- " Allocations: " + rootMetrics.getAllocatedResources() +
- " Availability: " + Resource.newInstance(
- rootMetrics.getAvailableMB(),
- rootMetrics.getAvailableVirtualCores()) +
- " Demand: " + rootQueue.getDemand());
+ if (LOG.isDebugEnabled()) {
+ if (--updatesToSkipForDebug < 0) {
+ updatesToSkipForDebug = UPDATE_DEBUG_FREQUENCY;
+ LOG.debug("Cluster Capacity: " + clusterResource + " Allocations: "
+ + rootMetrics.getAllocatedResources() + " Availability: "
+ + Resource.newInstance(rootMetrics.getAvailableMB(),
+ rootMetrics.getAvailableVirtualCores()) + " Demand: " + rootQueue
+ .getDemand());
+ }
}
- }
- long duration = getClock().getTime() - start;
- fsOpDurations.addUpdateCallDuration(duration);
+ long duration = getClock().getTime() - start;
+ fsOpDurations.addUpdateCallDuration(duration);
+ } finally {
+ writeLock.unlock();
+ }
}
/**
@@ -389,23 +396,28 @@ public class FairScheduler extends
* such queues exist, compute how many tasks of each type need to be preempted
* and then select the right ones using preemptTasks.
*/
- protected synchronized void preemptTasksIfNecessary() {
- if (!shouldAttemptPreemption()) {
- return;
- }
+ protected void preemptTasksIfNecessary() {
+ try {
+ writeLock.lock();
+ if (!shouldAttemptPreemption()) {
+ return;
+ }
- long curTime = getClock().getTime();
- if (curTime - lastPreemptCheckTime < preemptionInterval) {
- return;
- }
- lastPreemptCheckTime = curTime;
+ long curTime = getClock().getTime();
+ if (curTime - lastPreemptCheckTime < preemptionInterval) {
+ return;
+ }
+ lastPreemptCheckTime = curTime;
- Resource resToPreempt = Resources.clone(Resources.none());
- for (FSLeafQueue sched : queueMgr.getLeafQueues()) {
- Resources.addTo(resToPreempt, resourceDeficit(sched, curTime));
- }
- if (isResourceGreaterThanNone(resToPreempt)) {
- preemptResources(resToPreempt);
+ Resource resToPreempt = Resources.clone(Resources.none());
+ for (FSLeafQueue sched : queueMgr.getLeafQueues()) {
+ Resources.addTo(resToPreempt, resourceDeficit(sched, curTime));
+ }
+ if (isResourceGreaterThanNone(resToPreempt)) {
+ preemptResources(resToPreempt);
+ }
+ } finally {
+ writeLock.unlock();
}
}
@@ -549,22 +561,27 @@ public class FairScheduler extends
return deficit;
}
- public synchronized RMContainerTokenSecretManager
+ public RMContainerTokenSecretManager
getContainerTokenSecretManager() {
return rmContext.getContainerTokenSecretManager();
}
- // synchronized for sizeBasedWeight
- public synchronized ResourceWeights getAppWeight(FSAppAttempt app) {
- double weight = 1.0;
- if (sizeBasedWeight) {
- // Set weight based on current memory demand
- weight = Math.log1p(app.getDemand().getMemorySize()) / Math.log(2);
+ public ResourceWeights getAppWeight(FSAppAttempt app) {
+ try {
+ readLock.lock();
+ double weight = 1.0;
+ if (sizeBasedWeight) {
+ // Set weight based on current memory demand
+ weight = Math.log1p(app.getDemand().getMemorySize()) / Math.log(2);
+ }
+ weight *= app.getPriority().getPriority();
+ ResourceWeights resourceWeights = app.getResourceWeights();
+ resourceWeights.setWeight((float) weight);
+ return resourceWeights;
+ } finally {
+ readLock.unlock();
}
- weight *= app.getPriority().getPriority();
- ResourceWeights resourceWeights = app.getResourceWeights();
- resourceWeights.setWeight((float)weight);
- return resourceWeights;
+
}
public Resource getIncrementResourceCapability() {
@@ -595,7 +612,7 @@ public class FairScheduler extends
return continuousSchedulingEnabled;
}
- public synchronized int getContinuousSchedulingSleepMs() {
+ public int getContinuousSchedulingSleepMs() {
return continuousSchedulingSleepMs;
}
@@ -617,114 +634,123 @@ public class FairScheduler extends
* user. This will accept a new app even if the user or queue is above
* configured limits, but the app will not be marked as runnable.
*/
- protected synchronized void addApplication(ApplicationId applicationId,
+ protected void addApplication(ApplicationId applicationId,
String queueName, String user, boolean isAppRecovering) {
if (queueName == null || queueName.isEmpty()) {
- String message = "Reject application " + applicationId +
- " submitted by user " + user + " with an empty queue name.";
+ String message =
+ "Reject application " + applicationId + " submitted by user " + user
+ + " with an empty queue name.";
LOG.info(message);
- rmContext.getDispatcher().getEventHandler()
- .handle(new RMAppEvent(applicationId,
- RMAppEventType.APP_REJECTED, message));
+ rmContext.getDispatcher().getEventHandler().handle(
+ new RMAppEvent(applicationId, RMAppEventType.APP_REJECTED,
+ message));
return;
}
if (queueName.startsWith(".") || queueName.endsWith(".")) {
- String message = "Reject application " + applicationId
- + " submitted by user " + user + " with an illegal queue name "
- + queueName + ". "
- + "The queue name cannot start/end with period.";
+ String message =
+ "Reject application " + applicationId + " submitted by user " + user
+ + " with an illegal queue name " + queueName + ". "
+ + "The queue name cannot start/end with period.";
LOG.info(message);
- rmContext.getDispatcher().getEventHandler()
- .handle(new RMAppEvent(applicationId,
- RMAppEventType.APP_REJECTED, message));
+ rmContext.getDispatcher().getEventHandler().handle(
+ new RMAppEvent(applicationId, RMAppEventType.APP_REJECTED,
+ message));
return;
}
- RMApp rmApp = rmContext.getRMApps().get(applicationId);
- FSLeafQueue queue = assignToQueue(rmApp, queueName, user);
- if (queue == null) {
- return;
- }
-
- // Enforce ACLs
- UserGroupInformation userUgi = UserGroupInformation.createRemoteUser(user);
-
- if (!queue.hasAccess(QueueACL.SUBMIT_APPLICATIONS, userUgi)
- && !queue.hasAccess(QueueACL.ADMINISTER_QUEUE, userUgi)) {
- String msg = "User " + userUgi.getUserName() +
- " cannot submit applications to queue " + queue.getName() +
- "(requested queuename is " + queueName + ")";
- LOG.info(msg);
- rmContext.getDispatcher().getEventHandler()
- .handle(new RMAppEvent(applicationId,
- RMAppEventType.APP_REJECTED, msg));
- return;
- }
-
- SchedulerApplication application =
- new SchedulerApplication(queue, user);
- applications.put(applicationId, application);
- queue.getMetrics().submitApp(user);
-
- LOG.info("Accepted application " + applicationId + " from user: " + user
- + ", in queue: " + queue.getName()
- + ", currently num of applications: " + applications.size());
- if (isAppRecovering) {
- if (LOG.isDebugEnabled()) {
- LOG.debug(applicationId
- + " is recovering. Skip notifying APP_ACCEPTED");
+ try {
+ writeLock.lock();
+ RMApp rmApp = rmContext.getRMApps().get(applicationId);
+ FSLeafQueue queue = assignToQueue(rmApp, queueName, user);
+ if (queue == null) {
+ return;
}
- } else {
- rmContext.getDispatcher().getEventHandler()
- .handle(new RMAppEvent(applicationId, RMAppEventType.APP_ACCEPTED));
+
+ // Enforce ACLs
+ UserGroupInformation userUgi = UserGroupInformation.createRemoteUser(
+ user);
+
+ if (!queue.hasAccess(QueueACL.SUBMIT_APPLICATIONS, userUgi) && !queue
+ .hasAccess(QueueACL.ADMINISTER_QUEUE, userUgi)) {
+ String msg = "User " + userUgi.getUserName()
+ + " cannot submit applications to queue " + queue.getName()
+ + "(requested queuename is " + queueName + ")";
+ LOG.info(msg);
+ rmContext.getDispatcher().getEventHandler().handle(
+ new RMAppEvent(applicationId, RMAppEventType.APP_REJECTED, msg));
+ return;
+ }
+
+ SchedulerApplication application =
+ new SchedulerApplication(queue, user);
+ applications.put(applicationId, application);
+ queue.getMetrics().submitApp(user);
+
+ LOG.info("Accepted application " + applicationId + " from user: " + user
+ + ", in queue: " + queue.getName()
+ + ", currently num of applications: " + applications.size());
+ if (isAppRecovering) {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug(applicationId
+ + " is recovering. Skip notifying APP_ACCEPTED");
+ }
+ } else{
+ rmContext.getDispatcher().getEventHandler().handle(
+ new RMAppEvent(applicationId, RMAppEventType.APP_ACCEPTED));
+ }
+ } finally {
+ writeLock.unlock();
}
}
/**
* Add a new application attempt to the scheduler.
*/
- protected synchronized void addApplicationAttempt(
+ protected void addApplicationAttempt(
ApplicationAttemptId applicationAttemptId,
boolean transferStateFromPreviousAttempt,
boolean isAttemptRecovering) {
- SchedulerApplication application =
- applications.get(applicationAttemptId.getApplicationId());
- String user = application.getUser();
- FSLeafQueue queue = (FSLeafQueue) application.getQueue();
+ try {
+ writeLock.lock();
+ SchedulerApplication application = applications.get(
+ applicationAttemptId.getApplicationId());
+ String user = application.getUser();
+ FSLeafQueue queue = (FSLeafQueue) application.getQueue();
- FSAppAttempt attempt =
- new FSAppAttempt(this, applicationAttemptId, user,
- queue, new ActiveUsersManager(getRootQueueMetrics()),
- rmContext);
- if (transferStateFromPreviousAttempt) {
- attempt.transferStateFromPreviousAttempt(application
- .getCurrentAppAttempt());
- }
- application.setCurrentAppAttempt(attempt);
-
- boolean runnable = maxRunningEnforcer.canAppBeRunnable(queue, user);
- queue.addApp(attempt, runnable);
- if (runnable) {
- maxRunningEnforcer.trackRunnableApp(attempt);
- } else {
- maxRunningEnforcer.trackNonRunnableApp(attempt);
- }
-
- queue.getMetrics().submitAppAttempt(user);
-
- LOG.info("Added Application Attempt " + applicationAttemptId
- + " to scheduler from user: " + user);
-
- if (isAttemptRecovering) {
- if (LOG.isDebugEnabled()) {
- LOG.debug(applicationAttemptId
- + " is recovering. Skipping notifying ATTEMPT_ADDED");
+ FSAppAttempt attempt = new FSAppAttempt(this, applicationAttemptId, user,
+ queue, new ActiveUsersManager(getRootQueueMetrics()), rmContext);
+ if (transferStateFromPreviousAttempt) {
+ attempt.transferStateFromPreviousAttempt(
+ application.getCurrentAppAttempt());
}
- } else {
- rmContext.getDispatcher().getEventHandler().handle(
- new RMAppAttemptEvent(applicationAttemptId,
- RMAppAttemptEventType.ATTEMPT_ADDED));
+ application.setCurrentAppAttempt(attempt);
+
+ boolean runnable = maxRunningEnforcer.canAppBeRunnable(queue, user);
+ queue.addApp(attempt, runnable);
+ if (runnable) {
+ maxRunningEnforcer.trackRunnableApp(attempt);
+ } else{
+ maxRunningEnforcer.trackNonRunnableApp(attempt);
+ }
+
+ queue.getMetrics().submitAppAttempt(user);
+
+ LOG.info("Added Application Attempt " + applicationAttemptId
+ + " to scheduler from user: " + user);
+
+ if (isAttemptRecovering) {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug(applicationAttemptId
+ + " is recovering. Skipping notifying ATTEMPT_ADDED");
+ }
+ } else{
+ rmContext.getDispatcher().getEventHandler().handle(
+ new RMAppAttemptEvent(applicationAttemptId,
+ RMAppAttemptEventType.ATTEMPT_ADDED));
+ }
+ } finally {
+ writeLock.unlock();
}
}
@@ -770,70 +796,71 @@ public class FairScheduler extends
return queue;
}
- private synchronized void removeApplication(ApplicationId applicationId,
+ private void removeApplication(ApplicationId applicationId,
RMAppState finalState) {
- SchedulerApplication application =
- applications.get(applicationId);
- if (application == null){
+ SchedulerApplication application = applications.remove(
+ applicationId);
+ if (application == null) {
LOG.warn("Couldn't find application " + applicationId);
- return;
+ } else{
+ application.stop(finalState);
}
- application.stop(finalState);
- applications.remove(applicationId);
}
- private synchronized void removeApplicationAttempt(
+ private void removeApplicationAttempt(
ApplicationAttemptId applicationAttemptId,
RMAppAttemptState rmAppAttemptFinalState, boolean keepContainers) {
- LOG.info("Application " + applicationAttemptId + " is done." +
- " finalState=" + rmAppAttemptFinalState);
- SchedulerApplication application =
- applications.get(applicationAttemptId.getApplicationId());
- FSAppAttempt attempt = getSchedulerApp(applicationAttemptId);
+ try {
+ writeLock.lock();
+ LOG.info(
+ "Application " + applicationAttemptId + " is done." + " finalState="
+ + rmAppAttemptFinalState);
+ FSAppAttempt attempt = getApplicationAttempt(applicationAttemptId);
- if (attempt == null || application == null) {
- LOG.info("Unknown application " + applicationAttemptId + " has completed!");
- return;
- }
-
- // Release all the running containers
- for (RMContainer rmContainer : attempt.getLiveContainers()) {
- if (keepContainers
- && rmContainer.getState().equals(RMContainerState.RUNNING)) {
- // do not kill the running container in the case of work-preserving AM
- // restart.
- LOG.info("Skip killing " + rmContainer.getContainerId());
- continue;
+ if (attempt == null) {
+ LOG.info(
+ "Unknown application " + applicationAttemptId + " has completed!");
+ return;
}
- super.completedContainer(rmContainer,
- SchedulerUtils.createAbnormalContainerStatus(
- rmContainer.getContainerId(),
- SchedulerUtils.COMPLETED_APPLICATION),
- RMContainerEventType.KILL);
- }
- // Release all reserved containers
- for (RMContainer rmContainer : attempt.getReservedContainers()) {
- super.completedContainer(rmContainer,
- SchedulerUtils.createAbnormalContainerStatus(
- rmContainer.getContainerId(),
- "Application Complete"),
- RMContainerEventType.KILL);
- }
- // Clean up pending requests, metrics etc.
- attempt.stop(rmAppAttemptFinalState);
+ // Release all the running containers
+ for (RMContainer rmContainer : attempt.getLiveContainers()) {
+ if (keepContainers && rmContainer.getState().equals(
+ RMContainerState.RUNNING)) {
+ // do not kill the running container in the case of work-preserving AM
+ // restart.
+ LOG.info("Skip killing " + rmContainer.getContainerId());
+ continue;
+ }
+ super.completedContainer(rmContainer, SchedulerUtils
+ .createAbnormalContainerStatus(rmContainer.getContainerId(),
+ SchedulerUtils.COMPLETED_APPLICATION),
+ RMContainerEventType.KILL);
+ }
- // Inform the queue
- FSLeafQueue queue = queueMgr.getLeafQueue(attempt.getQueue()
- .getQueueName(), false);
- boolean wasRunnable = queue.removeApp(attempt);
+ // Release all reserved containers
+ for (RMContainer rmContainer : attempt.getReservedContainers()) {
+ super.completedContainer(rmContainer, SchedulerUtils
+ .createAbnormalContainerStatus(rmContainer.getContainerId(),
+ "Application Complete"), RMContainerEventType.KILL);
+ }
+ // Clean up pending requests, metrics etc.
+ attempt.stop(rmAppAttemptFinalState);
- if (wasRunnable) {
- maxRunningEnforcer.untrackRunnableApp(attempt);
- maxRunningEnforcer.updateRunnabilityOnAppRemoval(attempt,
- attempt.getQueue());
- } else {
- maxRunningEnforcer.untrackNonRunnableApp(attempt);
+ // Inform the queue
+ FSLeafQueue queue = queueMgr.getLeafQueue(
+ attempt.getQueue().getQueueName(), false);
+ boolean wasRunnable = queue.removeApp(attempt);
+
+ if (wasRunnable) {
+ maxRunningEnforcer.untrackRunnableApp(attempt);
+ maxRunningEnforcer.updateRunnabilityOnAppRemoval(attempt,
+ attempt.getQueue());
+ } else{
+ maxRunningEnforcer.untrackNonRunnableApp(attempt);
+ }
+ } finally {
+ writeLock.unlock();
}
}
@@ -841,97 +868,108 @@ public class FairScheduler extends
* Clean up a completed container.
*/
@Override
- protected synchronized void completedContainerInternal(
+ protected void completedContainerInternal(
RMContainer rmContainer, ContainerStatus containerStatus,
RMContainerEventType event) {
+ try {
+ writeLock.lock();
+ Container container = rmContainer.getContainer();
- Container container = rmContainer.getContainer();
+ // Get the application for the finished container
+ FSAppAttempt application = getCurrentAttemptForContainer(
+ container.getId());
+ ApplicationId appId =
+ container.getId().getApplicationAttemptId().getApplicationId();
+ if (application == null) {
+ LOG.info(
+ "Container " + container + " of" + " finished application " + appId
+ + " completed with event " + event);
+ return;
+ }
- // Get the application for the finished container
- FSAppAttempt application =
- getCurrentAttemptForContainer(container.getId());
- ApplicationId appId =
- container.getId().getApplicationAttemptId().getApplicationId();
- if (application == null) {
- LOG.info("Container " + container + " of" +
- " finished application " + appId +
- " completed with event " + event);
- return;
- }
+ // Get the node on which the container was allocated
+ FSSchedulerNode node = getFSSchedulerNode(container.getNodeId());
- // Get the node on which the container was allocated
- FSSchedulerNode node = getFSSchedulerNode(container.getNodeId());
+ if (rmContainer.getState() == RMContainerState.RESERVED) {
+ application.unreserve(rmContainer.getReservedSchedulerKey(), node);
+ } else{
+ application.containerCompleted(rmContainer, containerStatus, event);
+ node.releaseContainer(container);
+ updateRootQueueMetrics();
+ }
- if (rmContainer.getState() == RMContainerState.RESERVED) {
- application.unreserve(rmContainer.getReservedSchedulerKey(), node);
- } else {
- application.containerCompleted(rmContainer, containerStatus, event);
- node.releaseContainer(container);
- updateRootQueueMetrics();
- }
-
- if (LOG.isDebugEnabled()) {
- LOG.debug("Application attempt " + application.getApplicationAttemptId()
- + " released container " + container.getId() + " on node: " + node
- + " with event: " + event);
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Application attempt " + application.getApplicationAttemptId()
+ + " released container " + container.getId() + " on node: " + node
+ + " with event: " + event);
+ }
+ } finally {
+ writeLock.unlock();
}
}
- private synchronized void addNode(List containerReports,
+ private void addNode(List containerReports,
RMNode node) {
- FSSchedulerNode schedulerNode = new FSSchedulerNode(node, usePortForNodeName);
- nodeTracker.addNode(schedulerNode);
+ try {
+ writeLock.lock();
+ FSSchedulerNode schedulerNode = new FSSchedulerNode(node,
+ usePortForNodeName);
+ nodeTracker.addNode(schedulerNode);
- triggerUpdate();
+ triggerUpdate();
- Resource clusterResource = getClusterResource();
- queueMgr.getRootQueue().setSteadyFairShare(clusterResource);
- queueMgr.getRootQueue().recomputeSteadyShares();
- LOG.info("Added node " + node.getNodeAddress() +
- " cluster capacity: " + clusterResource);
+ Resource clusterResource = getClusterResource();
+ queueMgr.getRootQueue().setSteadyFairShare(clusterResource);
+ queueMgr.getRootQueue().recomputeSteadyShares();
+ LOG.info("Added node " + node.getNodeAddress() + " cluster capacity: "
+ + clusterResource);
- recoverContainersOnNode(containerReports, node);
- updateRootQueueMetrics();
+ recoverContainersOnNode(containerReports, node);
+ updateRootQueueMetrics();
+ } finally {
+ writeLock.unlock();
+ }
}
- private synchronized void removeNode(RMNode rmNode) {
- NodeId nodeId = rmNode.getNodeID();
- FSSchedulerNode node = nodeTracker.getNode(nodeId);
- if (node == null) {
- LOG.error("Attempting to remove non-existent node " + nodeId);
- return;
+ private void removeNode(RMNode rmNode) {
+ try {
+ writeLock.lock();
+ NodeId nodeId = rmNode.getNodeID();
+ FSSchedulerNode node = nodeTracker.getNode(nodeId);
+ if (node == null) {
+ LOG.error("Attempting to remove non-existent node " + nodeId);
+ return;
+ }
+
+ // Remove running containers
+ List runningContainers =
+ node.getCopiedListOfRunningContainers();
+ for (RMContainer container : runningContainers) {
+ super.completedContainer(container, SchedulerUtils
+ .createAbnormalContainerStatus(container.getContainerId(),
+ SchedulerUtils.LOST_CONTAINER), RMContainerEventType.KILL);
+ }
+
+ // Remove reservations, if any
+ RMContainer reservedContainer = node.getReservedContainer();
+ if (reservedContainer != null) {
+ super.completedContainer(reservedContainer, SchedulerUtils
+ .createAbnormalContainerStatus(reservedContainer.getContainerId(),
+ SchedulerUtils.LOST_CONTAINER), RMContainerEventType.KILL);
+ }
+
+ nodeTracker.removeNode(nodeId);
+ Resource clusterResource = getClusterResource();
+ queueMgr.getRootQueue().setSteadyFairShare(clusterResource);
+ queueMgr.getRootQueue().recomputeSteadyShares();
+ updateRootQueueMetrics();
+ triggerUpdate();
+
+ LOG.info("Removed node " + rmNode.getNodeAddress() + " cluster capacity: "
+ + clusterResource);
+ } finally {
+ writeLock.unlock();
}
-
- // Remove running containers
- List runningContainers =
- node.getCopiedListOfRunningContainers();
- for (RMContainer container : runningContainers) {
- super.completedContainer(container,
- SchedulerUtils.createAbnormalContainerStatus(
- container.getContainerId(),
- SchedulerUtils.LOST_CONTAINER),
- RMContainerEventType.KILL);
- }
-
- // Remove reservations, if any
- RMContainer reservedContainer = node.getReservedContainer();
- if (reservedContainer != null) {
- super.completedContainer(reservedContainer,
- SchedulerUtils.createAbnormalContainerStatus(
- reservedContainer.getContainerId(),
- SchedulerUtils.LOST_CONTAINER),
- RMContainerEventType.KILL);
- }
-
- nodeTracker.removeNode(nodeId);
- Resource clusterResource = getClusterResource();
- queueMgr.getRootQueue().setSteadyFairShare(clusterResource);
- queueMgr.getRootQueue().recomputeSteadyShares();
- updateRootQueueMetrics();
- triggerUpdate();
-
- LOG.info("Removed node " + rmNode.getNodeAddress() +
- " cluster capacity: " + clusterResource);
}
@Override
@@ -960,12 +998,13 @@ public class FairScheduler extends
// Release containers
releaseContainers(release, application);
- synchronized (application) {
+ try {
+ application.getWriteLock().lock();
if (!ask.isEmpty()) {
if (LOG.isDebugEnabled()) {
- LOG.debug("allocate: pre-update" +
- " applicationAttemptId=" + appAttemptId +
- " application=" + application.getApplicationId());
+ LOG.debug(
+ "allocate: pre-update" + " applicationAttemptId=" + appAttemptId
+ + " application=" + application.getApplicationId());
}
application.showRequests();
@@ -974,98 +1013,107 @@ public class FairScheduler extends
application.showRequests();
}
-
- if (LOG.isDebugEnabled()) {
- LOG.debug("allocate: post-update" +
- " applicationAttemptId=" + appAttemptId +
- " #ask=" + ask.size() +
- " reservation= " + application.getCurrentReservation());
-
- LOG.debug("Preempting " + application.getPreemptionContainers().size()
- + " container(s)");
- }
-
- Set preemptionContainerIds = new HashSet();
- for (RMContainer container : application.getPreemptionContainers()) {
- preemptionContainerIds.add(container.getContainerId());
- }
-
- application.updateBlacklist(blacklistAdditions, blacklistRemovals);
-
- List newlyAllocatedContainers =
- application.pullNewlyAllocatedContainers();
- // Record container allocation time
- if (!(newlyAllocatedContainers.isEmpty())) {
- application.recordContainerAllocationTime(getClock().getTime());
- }
-
- Resource headroom = application.getHeadroom();
- application.setApplicationHeadroomForMetrics(headroom);
- return new Allocation(newlyAllocatedContainers, headroom,
- preemptionContainerIds, null, null, application.pullUpdatedNMTokens());
+ } finally {
+ application.getWriteLock().unlock();
}
+
+ if (LOG.isDebugEnabled()) {
+ LOG.debug(
+ "allocate: post-update" + " applicationAttemptId=" + appAttemptId
+ + " #ask=" + ask.size() + " reservation= " + application
+ .getCurrentReservation());
+
+ LOG.debug("Preempting " + application.getPreemptionContainers().size()
+ + " container(s)");
+ }
+
+ Set preemptionContainerIds = new HashSet();
+ for (RMContainer container : application.getPreemptionContainers()) {
+ preemptionContainerIds.add(container.getContainerId());
+ }
+
+ application.updateBlacklist(blacklistAdditions, blacklistRemovals);
+
+ List newlyAllocatedContainers =
+ application.pullNewlyAllocatedContainers();
+ // Record container allocation time
+ if (!(newlyAllocatedContainers.isEmpty())) {
+ application.recordContainerAllocationTime(getClock().getTime());
+ }
+
+ Resource headroom = application.getHeadroom();
+ application.setApplicationHeadroomForMetrics(headroom);
+ return new Allocation(newlyAllocatedContainers, headroom,
+ preemptionContainerIds, null, null,
+ application.pullUpdatedNMTokens());
}
/**
* Process a heartbeat update from a node.
*/
- private synchronized void nodeUpdate(RMNode nm) {
- long start = getClock().getTime();
- if (LOG.isDebugEnabled()) {
- LOG.debug("nodeUpdate: " + nm +
- " cluster capacity: " + getClusterResource());
- }
- eventLog.log("HEARTBEAT", nm.getHostName());
- FSSchedulerNode node = getFSSchedulerNode(nm.getNodeID());
-
- List containerInfoList = nm.pullContainerUpdates();
- List newlyLaunchedContainers = new ArrayList();
- List completedContainers = new ArrayList();
- for(UpdatedContainerInfo containerInfo : containerInfoList) {
- newlyLaunchedContainers.addAll(containerInfo.getNewlyLaunchedContainers());
- completedContainers.addAll(containerInfo.getCompletedContainers());
- }
- // Processing the newly launched containers
- for (ContainerStatus launchedContainer : newlyLaunchedContainers) {
- containerLaunchedOnNode(launchedContainer.getContainerId(), node);
- }
+ private void nodeUpdate(RMNode nm) {
+ try {
+ writeLock.lock();
+ long start = getClock().getTime();
+ if (LOG.isDebugEnabled()) {
+ LOG.debug(
+ "nodeUpdate: " + nm + " cluster capacity: " + getClusterResource());
+ }
+ eventLog.log("HEARTBEAT", nm.getHostName());
+ FSSchedulerNode node = getFSSchedulerNode(nm.getNodeID());
- // Process completed containers
- for (ContainerStatus completedContainer : completedContainers) {
- ContainerId containerId = completedContainer.getContainerId();
- LOG.debug("Container FINISHED: " + containerId);
- super.completedContainer(getRMContainer(containerId),
- completedContainer, RMContainerEventType.FINISHED);
- }
+ List containerInfoList = nm.pullContainerUpdates();
+ List newlyLaunchedContainers =
+ new ArrayList();
+ List completedContainers =
+ new ArrayList();
+ for (UpdatedContainerInfo containerInfo : containerInfoList) {
+ newlyLaunchedContainers.addAll(
+ containerInfo.getNewlyLaunchedContainers());
+ completedContainers.addAll(containerInfo.getCompletedContainers());
+ }
+ // Processing the newly launched containers
+ for (ContainerStatus launchedContainer : newlyLaunchedContainers) {
+ containerLaunchedOnNode(launchedContainer.getContainerId(), node);
+ }
- // If the node is decommissioning, send an update to have the total
- // resource equal to the used resource, so no available resource to
- // schedule.
- if (nm.getState() == NodeState.DECOMMISSIONING) {
- this.rmContext
- .getDispatcher()
- .getEventHandler()
- .handle(
- new RMNodeResourceUpdateEvent(nm.getNodeID(), ResourceOption
- .newInstance(getSchedulerNode(nm.getNodeID())
- .getAllocatedResource(), 0)));
- }
+ // Process completed containers
+ for (ContainerStatus completedContainer : completedContainers) {
+ ContainerId containerId = completedContainer.getContainerId();
+ LOG.debug("Container FINISHED: " + containerId);
+ super.completedContainer(getRMContainer(containerId),
+ completedContainer, RMContainerEventType.FINISHED);
+ }
- if (continuousSchedulingEnabled) {
- if (!completedContainers.isEmpty()) {
+ // If the node is decommissioning, send an update to have the total
+ // resource equal to the used resource, so no available resource to
+ // schedule.
+ if (nm.getState() == NodeState.DECOMMISSIONING) {
+ this.rmContext.getDispatcher().getEventHandler().handle(
+ new RMNodeResourceUpdateEvent(nm.getNodeID(), ResourceOption
+ .newInstance(
+ getSchedulerNode(nm.getNodeID()).getAllocatedResource(),
+ 0)));
+ }
+
+ if (continuousSchedulingEnabled) {
+ if (!completedContainers.isEmpty()) {
+ attemptScheduling(node);
+ }
+ } else{
attemptScheduling(node);
}
- } else {
- attemptScheduling(node);
+
+ // Updating node resource utilization
+ node.setAggregatedContainersUtilization(
+ nm.getAggregatedContainersUtilization());
+ node.setNodeUtilization(nm.getNodeUtilization());
+
+ long duration = getClock().getTime() - start;
+ fsOpDurations.addNodeUpdateDuration(duration);
+ } finally {
+ writeLock.unlock();
}
-
- // Updating node resource utilization
- node.setAggregatedContainersUtilization(
- nm.getAggregatedContainersUtilization());
- node.setNodeUtilization(nm.getNodeUtilization());
-
- long duration = getClock().getTime() - start;
- fsOpDurations.addNodeUpdateDuration(duration);
}
void continuousSchedulingAttempt() throws InterruptedException {
@@ -1126,52 +1174,59 @@ public class FairScheduler extends
}
@VisibleForTesting
- synchronized void attemptScheduling(FSSchedulerNode node) {
- if (rmContext.isWorkPreservingRecoveryEnabled()
- && !rmContext.isSchedulerReadyForAllocatingContainers()) {
- return;
- }
+ void attemptScheduling(FSSchedulerNode node) {
+ try {
+ writeLock.lock();
+ if (rmContext.isWorkPreservingRecoveryEnabled() && !rmContext
+ .isSchedulerReadyForAllocatingContainers()) {
+ return;
+ }
- final NodeId nodeID = node.getNodeID();
- if (!nodeTracker.exists(nodeID)) {
- // The node might have just been removed while this thread was waiting
- // on the synchronized lock before it entered this synchronized method
- LOG.info("Skipping scheduling as the node " + nodeID +
- " has been removed");
- return;
- }
+ final NodeId nodeID = node.getNodeID();
+ if (!nodeTracker.exists(nodeID)) {
+ // The node might have just been removed while this thread was waiting
+ // on the synchronized lock before it entered this synchronized method
+ LOG.info(
+ "Skipping scheduling as the node " + nodeID + " has been removed");
+ return;
+ }
- // Assign new containers...
- // 1. Check for reserved applications
- // 2. Schedule if there are no reservations
+ // Assign new containers...
+ // 1. Check for reserved applications
+ // 2. Schedule if there are no reservations
- boolean validReservation = false;
- FSAppAttempt reservedAppSchedulable = node.getReservedAppSchedulable();
- if (reservedAppSchedulable != null) {
- validReservation = reservedAppSchedulable.assignReservedContainer(node);
- }
- if (!validReservation) {
- // No reservation, schedule at queue which is farthest below fair share
- int assignedContainers = 0;
- Resource assignedResource = Resources.clone(Resources.none());
- Resource maxResourcesToAssign =
- Resources.multiply(node.getUnallocatedResource(), 0.5f);
- while (node.getReservedContainer() == null) {
- boolean assignedContainer = false;
- Resource assignment = queueMgr.getRootQueue().assignContainer(node);
- if (!assignment.equals(Resources.none())) {
- assignedContainers++;
- assignedContainer = true;
- Resources.addTo(assignedResource, assignment);
- }
- if (!assignedContainer) { break; }
- if (!shouldContinueAssigning(assignedContainers,
- maxResourcesToAssign, assignedResource)) {
- break;
+ boolean validReservation = false;
+ FSAppAttempt reservedAppSchedulable = node.getReservedAppSchedulable();
+ if (reservedAppSchedulable != null) {
+ validReservation = reservedAppSchedulable.assignReservedContainer(node);
+ }
+ if (!validReservation) {
+ // No reservation, schedule at queue which is farthest below fair share
+ int assignedContainers = 0;
+ Resource assignedResource = Resources.clone(Resources.none());
+ Resource maxResourcesToAssign = Resources.multiply(
+ node.getUnallocatedResource(), 0.5f);
+ while (node.getReservedContainer() == null) {
+ boolean assignedContainer = false;
+ Resource assignment = queueMgr.getRootQueue().assignContainer(node);
+ if (!assignment.equals(Resources.none())) {
+ assignedContainers++;
+ assignedContainer = true;
+ Resources.addTo(assignedResource, assignment);
+ }
+ if (!assignedContainer) {
+ break;
+ }
+ if (!shouldContinueAssigning(assignedContainers, maxResourcesToAssign,
+ assignedResource)) {
+ break;
+ }
}
}
+ updateRootQueueMetrics();
+ } finally {
+ writeLock.unlock();
}
- updateRootQueueMetrics();
}
public FSAppAttempt getSchedulerApp(ApplicationAttemptId appAttemptId) {
@@ -1314,51 +1369,55 @@ public class FairScheduler extends
}
}
- private synchronized String resolveReservationQueueName(String queueName,
+ private String resolveReservationQueueName(String queueName,
ApplicationId applicationId, ReservationId reservationID,
boolean isRecovering) {
- FSQueue queue = queueMgr.getQueue(queueName);
- if ((queue == null) || !allocConf.isReservable(queue.getQueueName())) {
- return queueName;
- }
- // Use fully specified name from now on (including root. prefix)
- queueName = queue.getQueueName();
- if (reservationID != null) {
- String resQName = queueName + "." + reservationID.toString();
- queue = queueMgr.getQueue(resQName);
- if (queue == null) {
- // reservation has terminated during failover
- if (isRecovering && allocConf.getMoveOnExpiry(queueName)) {
- // move to the default child queue of the plan
- return getDefaultQueueForPlanQueue(queueName);
+ try {
+ readLock.lock();
+ FSQueue queue = queueMgr.getQueue(queueName);
+ if ((queue == null) || !allocConf.isReservable(queue.getQueueName())) {
+ return queueName;
+ }
+ // Use fully specified name from now on (including root. prefix)
+ queueName = queue.getQueueName();
+ if (reservationID != null) {
+ String resQName = queueName + "." + reservationID.toString();
+ queue = queueMgr.getQueue(resQName);
+ if (queue == null) {
+ // reservation has terminated during failover
+ if (isRecovering && allocConf.getMoveOnExpiry(queueName)) {
+ // move to the default child queue of the plan
+ return getDefaultQueueForPlanQueue(queueName);
+ }
+ String message = "Application " + applicationId
+ + " submitted to a reservation which is not yet "
+ + "currently active: " + resQName;
+ this.rmContext.getDispatcher().getEventHandler().handle(
+ new RMAppEvent(applicationId, RMAppEventType.APP_REJECTED,
+ message));
+ return null;
}
- String message =
- "Application "
- + applicationId
- + " submitted to a reservation which is not yet currently active: "
- + resQName;
- this.rmContext.getDispatcher().getEventHandler()
- .handle(new RMAppEvent(applicationId,
- RMAppEventType.APP_REJECTED, message));
- return null;
+ if (!queue.getParent().getQueueName().equals(queueName)) {
+ String message =
+ "Application: " + applicationId + " submitted to a reservation "
+ + resQName + " which does not belong to the specified queue: "
+ + queueName;
+ this.rmContext.getDispatcher().getEventHandler().handle(
+ new RMAppEvent(applicationId, RMAppEventType.APP_REJECTED,
+ message));
+ return null;
+ }
+ // use the reservation queue to run the app
+ queueName = resQName;
+ } else{
+ // use the default child queue of the plan for unreserved apps
+ queueName = getDefaultQueueForPlanQueue(queueName);
}
- if (!queue.getParent().getQueueName().equals(queueName)) {
- String message =
- "Application: " + applicationId + " submitted to a reservation "
- + resQName + " which does not belong to the specified queue: "
- + queueName;
- this.rmContext.getDispatcher().getEventHandler()
- .handle(new RMAppEvent(applicationId,
- RMAppEventType.APP_REJECTED, message));
- return null;
- }
- // use the reservation queue to run the app
- queueName = resQName;
- } else {
- // use the default child queue of the plan for unreserved apps
- queueName = getDefaultQueueForPlanQueue(queueName);
+ return queueName;
+ } finally {
+ readLock.unlock();
}
- return queueName;
+
}
private String getDefaultQueueForPlanQueue(String queueName) {
@@ -1372,12 +1431,13 @@ public class FairScheduler extends
// NOT IMPLEMENTED
}
- public synchronized void setRMContext(RMContext rmContext) {
+ public void setRMContext(RMContext rmContext) {
this.rmContext = rmContext;
}
private void initScheduler(Configuration conf) throws IOException {
- synchronized (this) {
+ try {
+ writeLock.lock();
this.conf = new FairSchedulerConfiguration(conf);
validateConf(this.conf);
minimumAllocation = this.conf.getMinimumAllocation();
@@ -1385,8 +1445,7 @@ public class FairScheduler extends
incrAllocation = this.conf.getIncrementAllocation();
updateReservationThreshold();
continuousSchedulingEnabled = this.conf.isContinuousSchedulingEnabled();
- continuousSchedulingSleepMs =
- this.conf.getContinuousSchedulingSleepMs();
+ continuousSchedulingSleepMs = this.conf.getContinuousSchedulingSleepMs();
nodeLocalityThreshold = this.conf.getLocalityThresholdNode();
rackLocalityThreshold = this.conf.getLocalityThresholdRack();
nodeLocalityDelayMs = this.conf.getLocalityDelayNodeMs();
@@ -1407,8 +1466,8 @@ public class FairScheduler extends
if (updateInterval < 0) {
updateInterval = FairSchedulerConfiguration.DEFAULT_UPDATE_INTERVAL_MS;
LOG.warn(FairSchedulerConfiguration.UPDATE_INTERVAL_MS
- + " is invalid, so using default value " +
- +FairSchedulerConfiguration.DEFAULT_UPDATE_INTERVAL_MS
+ + " is invalid, so using default value "
+ + +FairSchedulerConfiguration.DEFAULT_UPDATE_INTERVAL_MS
+ " ms instead");
}
@@ -1416,8 +1475,7 @@ public class FairScheduler extends
fsOpDurations = FSOpDurations.getInstance(true);
// This stores per-application scheduling information
- this.applications = new ConcurrentHashMap<
- ApplicationId, SchedulerApplication>();
+ this.applications = new ConcurrentHashMap<>();
this.eventLog = new FairSchedulerEventLog();
eventLog.init(this.conf);
@@ -1438,6 +1496,8 @@ public class FairScheduler extends
schedulingThread.setName("FairSchedulerContinuousScheduling");
schedulingThread.setDaemon(true);
}
+ } finally {
+ writeLock.unlock();
}
allocsLoader.init(conf);
@@ -1460,15 +1520,21 @@ public class FairScheduler extends
reservationThreshold = newThreshold;
}
- private synchronized void startSchedulerThreads() {
- Preconditions.checkNotNull(updateThread, "updateThread is null");
- Preconditions.checkNotNull(allocsLoader, "allocsLoader is null");
- updateThread.start();
- if (continuousSchedulingEnabled) {
- Preconditions.checkNotNull(schedulingThread, "schedulingThread is null");
- schedulingThread.start();
+ private void startSchedulerThreads() {
+ try {
+ writeLock.lock();
+ Preconditions.checkNotNull(updateThread, "updateThread is null");
+ Preconditions.checkNotNull(allocsLoader, "allocsLoader is null");
+ updateThread.start();
+ if (continuousSchedulingEnabled) {
+ Preconditions.checkNotNull(schedulingThread,
+ "schedulingThread is null");
+ schedulingThread.start();
+ }
+ allocsLoader.start();
+ } finally {
+ writeLock.unlock();
}
- allocsLoader.start();
}
@Override
@@ -1485,7 +1551,8 @@ public class FairScheduler extends
@Override
public void serviceStop() throws Exception {
- synchronized (this) {
+ try {
+ writeLock.lock();
if (updateThread != null) {
updateThread.interrupt();
updateThread.join(THREAD_JOIN_TIMEOUT_MS);
@@ -1499,6 +1566,8 @@ public class FairScheduler extends
if (allocsLoader != null) {
allocsLoader.stop();
}
+ } finally {
+ writeLock.unlock();
}
super.serviceStop();
@@ -1542,17 +1611,22 @@ public class FairScheduler extends
}
@Override
- public synchronized boolean checkAccess(UserGroupInformation callerUGI,
+ public boolean checkAccess(UserGroupInformation callerUGI,
QueueACL acl, String queueName) {
- FSQueue queue = getQueueManager().getQueue(queueName);
- if (queue == null) {
- if (LOG.isDebugEnabled()) {
- LOG.debug("ACL not found for queue access-type " + acl
- + " for queue " + queueName);
+ try {
+ readLock.lock();
+ FSQueue queue = getQueueManager().getQueue(queueName);
+ if (queue == null) {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("ACL not found for queue access-type " + acl + " for queue "
+ + queueName);
+ }
+ return false;
}
- return false;
+ return queue.hasAccess(acl, callerUGI);
+ } finally {
+ readLock.unlock();
}
- return queue.hasAccess(acl, callerUGI);
}
public AllocationConfiguration getAllocationConfiguration() {
@@ -1566,12 +1640,16 @@ public class FairScheduler extends
public void onReload(AllocationConfiguration queueInfo) {
// Commit the reload; also create any queue defined in the alloc file
// if it does not already exist, so it can be displayed on the web UI.
- synchronized (FairScheduler.this) {
+
+ writeLock.lock();
+ try {
allocConf = queueInfo;
allocConf.getDefaultSchedulingPolicy().initialize(getClusterResource());
queueMgr.updateAllocationConfiguration(allocConf);
applyChildDefaults();
maxRunningEnforcer.updateRunnabilityOnReload();
+ } finally {
+ writeLock.unlock();
}
}
}
@@ -1616,32 +1694,41 @@ public class FairScheduler extends
}
@Override
- public synchronized String moveApplication(ApplicationId appId,
+ public String moveApplication(ApplicationId appId,
String queueName) throws YarnException {
- SchedulerApplication app = applications.get(appId);
- if (app == null) {
- throw new YarnException("App to be moved " + appId + " not found.");
- }
- FSAppAttempt attempt = (FSAppAttempt) app.getCurrentAppAttempt();
- // To serialize with FairScheduler#allocate, synchronize on app attempt
- synchronized (attempt) {
- FSLeafQueue oldQueue = (FSLeafQueue) app.getQueue();
- String destQueueName = handleMoveToPlanQueue(queueName);
- FSLeafQueue targetQueue = queueMgr.getLeafQueue(destQueueName, false);
- if (targetQueue == null) {
- throw new YarnException("Target queue " + queueName
- + " not found or is not a leaf queue.");
+ try {
+ writeLock.lock();
+ SchedulerApplication app = applications.get(appId);
+ if (app == null) {
+ throw new YarnException("App to be moved " + appId + " not found.");
}
- if (targetQueue == oldQueue) {
- return oldQueue.getQueueName();
+ FSAppAttempt attempt = (FSAppAttempt) app.getCurrentAppAttempt();
+ // To serialize with FairScheduler#allocate, synchronize on app attempt
+
+ try {
+ attempt.getWriteLock().lock();
+ FSLeafQueue oldQueue = (FSLeafQueue) app.getQueue();
+ String destQueueName = handleMoveToPlanQueue(queueName);
+ FSLeafQueue targetQueue = queueMgr.getLeafQueue(destQueueName, false);
+ if (targetQueue == null) {
+ throw new YarnException("Target queue " + queueName
+ + " not found or is not a leaf queue.");
+ }
+ if (targetQueue == oldQueue) {
+ return oldQueue.getQueueName();
+ }
+
+ if (oldQueue.isRunnableApp(attempt)) {
+ verifyMoveDoesNotViolateConstraints(attempt, oldQueue, targetQueue);
+ }
+
+ executeMove(app, attempt, oldQueue, targetQueue);
+ return targetQueue.getQueueName();
+ } finally {
+ attempt.getWriteLock().unlock();
}
-
- if (oldQueue.isRunnableApp(attempt)) {
- verifyMoveDoesNotViolateConstraints(attempt, oldQueue, targetQueue);
- }
-
- executeMove(app, attempt, oldQueue, targetQueue);
- return targetQueue.getQueueName();
+ } finally {
+ writeLock.unlock();
}
}
@@ -1737,12 +1824,17 @@ public class FairScheduler extends
* Process resource update on a node and update Queue.
*/
@Override
- public synchronized void updateNodeResource(RMNode nm,
+ public void updateNodeResource(RMNode nm,
ResourceOption resourceOption) {
- super.updateNodeResource(nm, resourceOption);
- updateRootQueueMetrics();
- queueMgr.getRootQueue().setSteadyFairShare(getClusterResource());
- queueMgr.getRootQueue().recomputeSteadyShares();
+ try {
+ writeLock.lock();
+ super.updateNodeResource(nm, resourceOption);
+ updateRootQueueMetrics();
+ queueMgr.getRootQueue().setSteadyFairShare(getClusterResource());
+ queueMgr.getRootQueue().recomputeSteadyShares();
+ } finally {
+ writeLock.unlock();
+ }
}
/** {@inheritDoc} */