YARN-8990. Fix fair scheduler race condition in app submit and queue cleanup. (Contributed by Wilfred Spiegelenburg) (#3254)

(cherry picked from commit 524a7523c4)

 Conflicts:
	hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java

Co-authored-by: Haibo Chen <haibochen@apache.org>
This commit is contained in:
Akira Ajisaka 2021-08-03 13:12:01 +09:00 committed by GitHub
parent 521c1085b4
commit 786a43e729
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 104 additions and 42 deletions

View File

@ -652,4 +652,18 @@ public class FSLeafQueue extends FSQueue {
writeLock.unlock(); writeLock.unlock();
} }
} }
/**
* This method is called when an application is removed from this queue
* during the submit process.
* @param applicationId the application's id
*/
public void removeAssignedApp(ApplicationId applicationId) {
writeLock.lock();
try {
assignedApps.remove(applicationId);
} finally {
writeLock.unlock();
}
}
} }

View File

@ -479,7 +479,7 @@ public class FairScheduler extends
RMApp rmApp = rmContext.getRMApps().get(applicationId); RMApp rmApp = rmContext.getRMApps().get(applicationId);
// This will re-create the queue on restore, however this could fail if // This will re-create the queue on restore, however this could fail if
// the config was changed. // the config was changed.
FSLeafQueue queue = assignToQueue(rmApp, queueName, user); FSLeafQueue queue = assignToQueue(rmApp, queueName, user, applicationId);
if (queue == null) { if (queue == null) {
if (!isAppRecovering) { if (!isAppRecovering) {
return; return;
@ -516,6 +516,7 @@ public class FairScheduler extends
applicationId, queue.getName(), applicationId, queue.getName(),
invalidAMResourceRequests, queue.getMaxShare()); invalidAMResourceRequests, queue.getMaxShare());
rejectApplicationWithMessage(applicationId, msg); rejectApplicationWithMessage(applicationId, msg);
queue.removeAssignedApp(applicationId);
return; return;
} }
} }
@ -533,6 +534,7 @@ public class FairScheduler extends
+ " cannot submit applications to queue " + queue.getName() + " cannot submit applications to queue " + queue.getName()
+ "(requested queuename is " + queueName + ")"; + "(requested queuename is " + queueName + ")";
rejectApplicationWithMessage(applicationId, msg); rejectApplicationWithMessage(applicationId, msg);
queue.removeAssignedApp(applicationId);
return; return;
} }
} }
@ -541,7 +543,6 @@ public class FairScheduler extends
new SchedulerApplication<FSAppAttempt>(queue, user); new SchedulerApplication<FSAppAttempt>(queue, user);
applications.put(applicationId, application); applications.put(applicationId, application);
queue.getMetrics().submitApp(user); queue.getMetrics().submitApp(user);
queue.addAssignedApp(applicationId);
LOG.info("Accepted application " + applicationId + " from user: " + user LOG.info("Accepted application " + applicationId + " from user: " + user
+ ", in queue: " + queue.getName() + ", in queue: " + queue.getName()
@ -618,11 +619,19 @@ public class FairScheduler extends
} }
/** /**
* Helper method that attempts to assign the app to a queue. The method is * Helper method for the tests to assign the app to a queue.
* responsible to call the appropriate event-handler if the app is rejected.
*/ */
@VisibleForTesting @VisibleForTesting
FSLeafQueue assignToQueue(RMApp rmApp, String queueName, String user) { FSLeafQueue assignToQueue(RMApp rmApp, String queueName, String user) {
return assignToQueue(rmApp, queueName, user, null);
}
/**
* Helper method that attempts to assign the app to a queue. The method is
* responsible to call the appropriate event-handler if the app is rejected.
*/
private FSLeafQueue assignToQueue(RMApp rmApp, String queueName, String user,
ApplicationId applicationId) {
FSLeafQueue queue = null; FSLeafQueue queue = null;
String appRejectMsg = null; String appRejectMsg = null;
@ -632,7 +641,7 @@ public class FairScheduler extends
if (queueName == null) { if (queueName == null) {
appRejectMsg = "Application rejected by queue placement policy"; appRejectMsg = "Application rejected by queue placement policy";
} else { } else {
queue = queueMgr.getLeafQueue(queueName, true); queue = queueMgr.getLeafQueue(queueName, true, applicationId);
if (queue == null) { if (queue == null) {
appRejectMsg = queueName + " is not a leaf queue"; appRejectMsg = queueName + " is not a leaf queue";
} }

View File

@ -38,6 +38,7 @@ import org.slf4j.LoggerFactory;
import org.apache.hadoop.classification.InterfaceAudience.Private; import org.apache.hadoop.classification.InterfaceAudience.Private;
import org.apache.hadoop.classification.InterfaceStability.Unstable; import org.apache.hadoop.classification.InterfaceStability.Unstable;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.policies.FifoPolicy; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.policies.FifoPolicy;
import org.xml.sax.SAXException; import org.xml.sax.SAXException;
@ -71,7 +72,7 @@ public class QueueManager {
Boolean removed = Boolean removed =
removeEmptyIncompatibleQueues(queueToCreate, queueType).orElse(null); removeEmptyIncompatibleQueues(queueToCreate, queueType).orElse(null);
if (Boolean.TRUE.equals(removed)) { if (Boolean.TRUE.equals(removed)) {
FSQueue queue = getQueue(queueToCreate, true, queueType, false); FSQueue queue = getQueue(queueToCreate, true, queueType, false, null);
if (queue != null && if (queue != null &&
// if queueToCreate is present in the allocation config, set it // if queueToCreate is present in the allocation config, set it
// to static // to static
@ -124,30 +125,49 @@ public class QueueManager {
/** /**
* Get a leaf queue by name, creating it if the create param is * Get a leaf queue by name, creating it if the create param is
* true and is necessary. * <code>true</code> and the queue does not exist.
* If the queue is not or can not be a leaf queue, i.e. it already exists as a * If the queue is not or can not be a leaf queue, i.e. it already exists as
* parent queue, or one of the parents in its name is already a leaf queue, * a parent queue, or one of the parents in its name is already a leaf queue,
* null is returned. * <code>null</code> is returned.
* *
* The root part of the name is optional, so a queue underneath the root * The root part of the name is optional, so a queue underneath the root
* named "queue1" could be referred to as just "queue1", and a queue named * named "queue1" could be referred to as just "queue1", and a queue named
* "queue2" underneath a parent named "parent1" that is underneath the root * "queue2" underneath a parent named "parent1" that is underneath the root
* could be referred to as just "parent1.queue2". * could be referred to as just "parent1.queue2".
* @param name name of the queue
* @param create <code>true</code> if the queue must be created if it does
* not exist, <code>false</code> otherwise
* @return the leaf queue or <code>null</code> if the queue cannot be found
*/ */
public FSLeafQueue getLeafQueue(String name, boolean create) { public FSLeafQueue getLeafQueue(String name, boolean create) {
return getLeafQueue(name, create, true); return getLeafQueue(name, create, null, true);
} }
private FSLeafQueue getLeafQueue( /**
String name, * Get a leaf queue by name, creating it if the create param is
boolean create, * <code>true</code> and the queue does not exist.
boolean recomputeSteadyShares) { * If the queue is not or can not be a leaf queue, i.e. it already exists as
FSQueue queue = getQueue( * a parent queue, or one of the parents in its name is already a leaf queue,
name, * <code>null</code> is returned.
create, *
FSQueueType.LEAF, * If the application will be assigned to the queue if the applicationId is
recomputeSteadyShares * not <code>null</code>
); * @param name name of the queue
* @param create <code>true</code> if the queue must be created if it does
* not exist, <code>false</code> otherwise
* @param applicationId the application ID to assign to the queue
* @return the leaf queue or <code>null</code> if teh queue cannot be found
*/
public FSLeafQueue getLeafQueue(String name, boolean create,
ApplicationId applicationId) {
return getLeafQueue(name, create, applicationId, true);
}
private FSLeafQueue getLeafQueue(String name, boolean create,
ApplicationId applicationId,
boolean recomputeSteadyShares) {
FSQueue queue = getQueue(name, create, FSQueueType.LEAF,
recomputeSteadyShares, applicationId);
if (queue instanceof FSParentQueue) { if (queue instanceof FSParentQueue) {
return null; return null;
} }
@ -168,42 +188,55 @@ public class QueueManager {
/** /**
* Get a parent queue by name, creating it if the create param is * Get a parent queue by name, creating it if the create param is
* true and is necessary. * <code>true</code> and the queue does not exist.
* If the queue is not or can not be a parent queue, * If the queue is not or can not be a parent queue, i.e. it already exists
* i.e. it already exists as a * as a leaf queue, or one of the parents in its name is already a leaf
* leaf queue, or one of the parents in its name is already a leaf queue, * queue, <code>null</code> is returned.
* null is returned.
* *
* The root part of the name is optional, so a queue underneath the root * The root part of the name is optional, so a queue underneath the root
* named "queue1" could be referred to as just "queue1", and a queue named * named "queue1" could be referred to as just "queue1", and a queue named
* "queue2" underneath a parent named "parent1" that is underneath the root * "queue2" underneath a parent named "parent1" that is underneath the root
* could be referred to as just "parent1.queue2". * could be referred to as just "parent1.queue2".
* @param name name of the queue
* @param create <code>true</code> if the queue must be created if it does
* not exist, <code>false</code> otherwise
* @return the parent queue or <code>null</code> if the queue cannot be found
*/ */
public FSParentQueue getParentQueue(String name, boolean create) { public FSParentQueue getParentQueue(String name, boolean create) {
return getParentQueue(name, create, true); return getParentQueue(name, create, true);
} }
public FSParentQueue getParentQueue( /**
String name, * Get a parent queue by name, creating it if the create param is
boolean create, * <code>true</code> and the queue does not exist.
* If the queue is not or can not be a parent queue, i.e. it already exists
* as a leaf queue, or one of the parents in its name is already a leaf
* queue, <code>null</code> is returned.
*
* The root part of the name is optional, so a queue underneath the root
* named "queue1" could be referred to as just "queue1", and a queue named
* "queue2" underneath a parent named "parent1" that is underneath the root
* could be referred to as just "parent1.queue2".
* @param name name of the queue
* @param create <code>true</code> if the queue must be created if it does
* not exist, <code>false</code> otherwise
* @param recomputeSteadyShares <code>true</code> if the steady fair share
* should be recalculated when a queue is added,
* <code>false</code> otherwise
* @return the parent queue or <code>null</code> if the queue cannot be found
*/
public FSParentQueue getParentQueue(String name, boolean create,
boolean recomputeSteadyShares) { boolean recomputeSteadyShares) {
FSQueue queue = getQueue( FSQueue queue = getQueue(name, create, FSQueueType.PARENT,
name, recomputeSteadyShares, null);
create,
FSQueueType.PARENT,
recomputeSteadyShares
);
if (queue instanceof FSLeafQueue) { if (queue instanceof FSLeafQueue) {
return null; return null;
} }
return (FSParentQueue) queue; return (FSParentQueue) queue;
} }
private FSQueue getQueue( private FSQueue getQueue(String name, boolean create, FSQueueType queueType,
String name, boolean recomputeSteadyShares, ApplicationId applicationId) {
boolean create,
FSQueueType queueType,
boolean recomputeSteadyShares) {
boolean recompute = recomputeSteadyShares; boolean recompute = recomputeSteadyShares;
name = ensureRootPrefix(name); name = ensureRootPrefix(name);
FSQueue queue; FSQueue queue;
@ -215,8 +248,14 @@ public class QueueManager {
} else { } else {
recompute = false; recompute = false;
} }
// At this point the queue exists and we need to assign the app if to the
// but only to a leaf queue
if (applicationId != null && queue instanceof FSLeafQueue) {
((FSLeafQueue)queue).addAssignedApp(applicationId);
}
} }
if (recompute) { // Don't recompute if it is an existing queue or no change was made
if (recompute && queue != null) {
rootQueue.recomputeSteadyShares(); rootQueue.recomputeSteadyShares();
} }
return queue; return queue;
@ -614,7 +653,7 @@ public class QueueManager {
incompatibleQueuesPendingRemoval.add( incompatibleQueuesPendingRemoval.add(
new IncompatibleQueueRemovalTask(name, queueType)); new IncompatibleQueueRemovalTask(name, queueType));
} else { } else {
FSQueue queue = getQueue(name, true, queueType, false); FSQueue queue = getQueue(name, true, queueType, false, null);
if (queue != null) { if (queue != null) {
queue.setDynamic(false); queue.setDynamic(false);
} }