YARN-7419. CapacityScheduler: Allow auto leaf queue creation after queue mapping. (Suma Shivaprasad via wangda)

Change-Id: Ia1704bb8cb5070e5b180b5a85787d7b9ca57ebc6
This commit is contained in:
Wangda Tan 2017-11-16 11:22:48 -08:00
parent f2efaf013f
commit 0987a7b8cb
21 changed files with 1921 additions and 265 deletions

View File

@ -360,13 +360,8 @@ public class RMAppManager implements EventHandler<RMAppManagerEvent>,
private RMAppImpl createAndPopulateNewRMApp( private RMAppImpl createAndPopulateNewRMApp(
ApplicationSubmissionContext submissionContext, long submitTime, ApplicationSubmissionContext submissionContext, long submitTime,
String user, boolean isRecovery, long startTime) throws YarnException { String user, boolean isRecovery, long startTime) throws YarnException {
if (!isRecovery) { if (!isRecovery) {
// Do queue mapping
if (rmContext.getQueuePlacementManager() != null) {
// We only do queue mapping when it's a new application
rmContext.getQueuePlacementManager().placeApplication(
submissionContext, user);
}
// fail the submission if configured application timeout value is invalid // fail the submission if configured application timeout value is invalid
RMServerUtils.validateApplicationTimeouts( RMServerUtils.validateApplicationTimeouts(
submissionContext.getApplicationTimeouts()); submissionContext.getApplicationTimeouts());

View File

@ -0,0 +1,52 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.server.resourcemanager.placement;
/**
* Each placement rule when it successfully places an application onto a queue
* returns a PlacementRuleContext which encapsulates the queue the
* application was mapped to and any parent queue for the queue (if configured)
*/
public class ApplicationPlacementContext {
private String queue;
private String parentQueue;
public ApplicationPlacementContext(String queue) {
this(queue,null);
}
public ApplicationPlacementContext(String queue, String parentQueue) {
this.queue = queue;
this.parentQueue = parentQueue;
}
public String getQueue() {
return queue;
}
public String getParentQueue() {
return parentQueue;
}
public boolean hasParentQueue() {
return parentQueue != null;
}
}

View File

@ -23,7 +23,6 @@ import java.util.concurrent.locks.ReentrantReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock.ReadLock; import java.util.concurrent.locks.ReentrantReadWriteLock.ReadLock;
import java.util.concurrent.locks.ReentrantReadWriteLock.WriteLock; import java.util.concurrent.locks.ReentrantReadWriteLock.WriteLock;
import org.apache.commons.lang.StringUtils;
import org.apache.commons.logging.Log; import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory; import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext; import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
@ -53,36 +52,33 @@ public class PlacementManager {
} }
} }
public void placeApplication(ApplicationSubmissionContext asc, String user) public ApplicationPlacementContext placeApplication(
throws YarnException { ApplicationSubmissionContext asc, String user) throws YarnException {
try { try {
readLock.lock(); readLock.lock();
if (null == rules || rules.isEmpty()) { if (null == rules || rules.isEmpty()) {
return; return null;
} }
String newQueueName = null; ApplicationPlacementContext placement = null;
for (PlacementRule rule : rules) { for (PlacementRule rule : rules) {
newQueueName = rule.getQueueForApp(asc, user); placement = rule.getPlacementForApp(asc, user);
if (newQueueName != null) { if (placement != null) {
break; break;
} }
} }
// Failed to get where to place application // Failed to get where to place application
if (null == newQueueName && null == asc.getQueue()) { if (null == placement && null == asc.getQueue()) {
String msg = "Failed to get where to place application=" String msg = "Failed to get where to place application=" + asc
+ asc.getApplicationId(); .getApplicationId();
LOG.error(msg); LOG.error(msg);
throw new YarnException(msg); throw new YarnException(msg);
} }
// Set it to ApplicationSubmissionContext return placement;
if (!StringUtils.equals(asc.getQueue(), newQueueName)) {
LOG.info("Placed application=" + asc.getApplicationId() + " to queue="
+ newQueueName + ", original queue=" + asc.getQueue());
asc.setQueue(newQueueName);
}
} finally { } finally {
readLock.unlock(); readLock.unlock();
} }

View File

@ -25,6 +25,7 @@ import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.server.resourcemanager.RMContext; import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
public abstract class PlacementRule { public abstract class PlacementRule {
public String getName() { public String getName() {
return this.getClass().getName(); return this.getClass().getName();
} }
@ -50,6 +51,6 @@ public abstract class PlacementRule {
* in the {@link PlacementManager} will take care * in the {@link PlacementManager} will take care
* </p> * </p>
*/ */
public abstract String getQueueForApp(ApplicationSubmissionContext asc, public abstract ApplicationPlacementContext getPlacementForApp(
String user) throws YarnException; ApplicationSubmissionContext asc, String user) throws YarnException;
} }

View File

@ -19,8 +19,10 @@
package org.apache.hadoop.yarn.server.resourcemanager.placement; package org.apache.hadoop.yarn.server.resourcemanager.placement;
import java.io.IOException; import java.io.IOException;
import java.util.ArrayList;
import java.util.List; import java.util.List;
import org.apache.commons.lang.StringUtils;
import org.apache.commons.logging.Log; import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory; import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.classification.InterfaceAudience.Private; import org.apache.hadoop.classification.InterfaceAudience.Private;
@ -32,6 +34,15 @@ import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.server.resourcemanager.placement.UserGroupMappingPlacementRule.QueueMapping.MappingType; import org.apache.hadoop.yarn.server.resourcemanager.placement.UserGroupMappingPlacementRule.QueueMapping.MappingType;
import com.google.common.annotations.VisibleForTesting; import com.google.common.annotations.VisibleForTesting;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.AutoCreatedLeafQueue;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CSQueue;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerConfiguration;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerContext;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerQueueManager;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.LeafQueue;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.ManagedParentQueue;
import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerConfiguration.DOT;
public class UserGroupMappingPlacementRule extends PlacementRule { public class UserGroupMappingPlacementRule extends PlacementRule {
private static final Log LOG = LogFactory private static final Log LOG = LogFactory
@ -66,17 +77,41 @@ public class UserGroupMappingPlacementRule extends PlacementRule {
MappingType type; MappingType type;
String source; String source;
String queue; String queue;
String parentQueue;
public final static String DELIMITER = ":";
public QueueMapping(MappingType type, String source, String queue) { public QueueMapping(MappingType type, String source, String queue) {
this.type = type; this.type = type;
this.source = source; this.source = source;
this.queue = queue; this.queue = queue;
this.parentQueue = null;
}
public QueueMapping(MappingType type, String source,
String queue, String parentQueue) {
this.type = type;
this.source = source;
this.queue = queue;
this.parentQueue = parentQueue;
} }
public String getQueue() { public String getQueue() {
return queue; return queue;
} }
public String getParentQueue() {
return parentQueue;
}
public MappingType getType() {
return type;
}
public String getSource() {
return source;
}
@Override @Override
public int hashCode() { public int hashCode() {
return super.hashCode(); return super.hashCode();
@ -93,6 +128,13 @@ public class UserGroupMappingPlacementRule extends PlacementRule {
return false; return false;
} }
} }
public String toString() {
return type.toString() + DELIMITER + source + DELIMITER +
(parentQueue != null ?
parentQueue + "." + queue :
queue);
}
} }
public UserGroupMappingPlacementRule(boolean overrideWithQueueMappings, public UserGroupMappingPlacementRule(boolean overrideWithQueueMappings,
@ -102,26 +144,27 @@ public class UserGroupMappingPlacementRule extends PlacementRule {
this.groups = groups; this.groups = groups;
} }
private String getMappedQueue(String user) throws IOException { private ApplicationPlacementContext getPlacementForUser(String user)
throws IOException {
for (QueueMapping mapping : mappings) { for (QueueMapping mapping : mappings) {
if (mapping.type == MappingType.USER) { if (mapping.type == MappingType.USER) {
if (mapping.source.equals(CURRENT_USER_MAPPING)) { if (mapping.source.equals(CURRENT_USER_MAPPING)) {
if (mapping.queue.equals(CURRENT_USER_MAPPING)) { if (mapping.queue.equals(CURRENT_USER_MAPPING)) {
return user; return getPlacementContext(mapping, user);
} else if (mapping.queue.equals(PRIMARY_GROUP_MAPPING)) { } else if (mapping.queue.equals(PRIMARY_GROUP_MAPPING)) {
return groups.getGroups(user).get(0); return getPlacementContext(mapping, groups.getGroups(user).get(0));
} else { } else {
return mapping.queue; return getPlacementContext(mapping);
} }
} }
if (user.equals(mapping.source)) { if (user.equals(mapping.source)) {
return mapping.queue; return getPlacementContext(mapping);
} }
} }
if (mapping.type == MappingType.GROUP) { if (mapping.type == MappingType.GROUP) {
for (String userGroups : groups.getGroups(user)) { for (String userGroups : groups.getGroups(user)) {
if (userGroups.equals(mapping.source)) { if (userGroups.equals(mapping.source)) {
return mapping.queue; return getPlacementContext(mapping);
} }
} }
} }
@ -130,13 +173,14 @@ public class UserGroupMappingPlacementRule extends PlacementRule {
} }
@Override @Override
public String getQueueForApp(ApplicationSubmissionContext asc, String user) public ApplicationPlacementContext getPlacementForApp(
ApplicationSubmissionContext asc, String user)
throws YarnException { throws YarnException {
String queueName = asc.getQueue(); String queueName = asc.getQueue();
ApplicationId applicationId = asc.getApplicationId(); ApplicationId applicationId = asc.getApplicationId();
if (mappings != null && mappings.size() > 0) { if (mappings != null && mappings.size() > 0) {
try { try {
String mappedQueue = getMappedQueue(user); ApplicationPlacementContext mappedQueue = getPlacementForUser(user);
if (mappedQueue != null) { if (mappedQueue != null) {
// We have a mapping, should we use it? // We have a mapping, should we use it?
if (queueName.equals(YarnConfiguration.DEFAULT_QUEUE_NAME) if (queueName.equals(YarnConfiguration.DEFAULT_QUEUE_NAME)
@ -153,8 +197,222 @@ public class UserGroupMappingPlacementRule extends PlacementRule {
throw new YarnException(message); throw new YarnException(message);
} }
} }
return null;
}
return queueName; private ApplicationPlacementContext getPlacementContext(
QueueMapping mapping) {
return getPlacementContext(mapping, mapping.getQueue());
}
private ApplicationPlacementContext getPlacementContext(QueueMapping mapping,
String leafQueueName) {
if (!StringUtils.isEmpty(mapping.parentQueue)) {
return new ApplicationPlacementContext(leafQueueName,
mapping.getParentQueue());
} else{
return new ApplicationPlacementContext(leafQueueName);
}
}
@VisibleForTesting
public static UserGroupMappingPlacementRule get(
CapacitySchedulerContext schedulerContext) throws IOException {
CapacitySchedulerConfiguration conf = schedulerContext.getConfiguration();
boolean overrideWithQueueMappings = conf.getOverrideWithQueueMappings();
LOG.info(
"Initialized queue mappings, override: " + overrideWithQueueMappings);
List<QueueMapping> queueMappings = conf.getQueueMappings();
// Get new user/group mappings
List<QueueMapping> newMappings = new ArrayList<>();
CapacitySchedulerQueueManager queueManager =
schedulerContext.getCapacitySchedulerQueueManager();
// check if mappings refer to valid queues
for (QueueMapping mapping : queueMappings) {
QueuePath queuePath = extractQueuePath(mapping.getQueue());
if (isStaticQueueMapping(mapping)) {
//Try getting queue by its leaf queue name
// without splitting into parent/leaf queues
CSQueue queue = queueManager.getQueue(mapping.getQueue());
if (ifQueueDoesNotExist(queue)) {
//Try getting the queue by extracting leaf and parent queue names
//Assuming its a potential auto created leaf queue
queue = queueManager.getQueue(queuePath.getLeafQueue());
if (ifQueueDoesNotExist(queue)) {
//if leaf queue does not exist,
// this could be a potential auto created leaf queue
//validate if parent queue is specified,
// then it should exist and
// be an instance of AutoCreateEnabledParentQueue
QueueMapping newMapping = validateAndGetAutoCreatedQueueMapping(
queueManager, mapping, queuePath);
if (newMapping == null) {
throw new IOException(
"mapping contains invalid or non-leaf queue " + mapping
.getQueue());
}
newMappings.add(newMapping);
} else{
QueueMapping newMapping = validateAndGetQueueMapping(queueManager,
queue, mapping, queuePath);
newMappings.add(newMapping);
}
} else{
// if queue exists, validate
// if its an instance of leaf queue
// if its an instance of auto created leaf queue,
// then extract parent queue name and update queue mapping
QueueMapping newMapping = validateAndGetQueueMapping(queueManager,
queue, mapping, queuePath);
newMappings.add(newMapping);
}
} else{
//If it is a dynamic queue mapping,
// we can safely assume leaf queue name does not have '.' in it
// validate
// if parent queue is specified, then
// parent queue exists and an instance of AutoCreateEnabledParentQueue
//
QueueMapping newMapping = validateAndGetAutoCreatedQueueMapping(
queueManager, mapping, queuePath);
if (newMapping != null) {
newMappings.add(newMapping);
} else{
newMappings.add(mapping);
}
}
}
// initialize groups if mappings are present
if (newMappings.size() > 0) {
Groups groups = new Groups(conf);
return new UserGroupMappingPlacementRule(overrideWithQueueMappings,
newMappings, groups);
}
return null;
}
private static QueueMapping validateAndGetQueueMapping(
CapacitySchedulerQueueManager queueManager, CSQueue queue,
QueueMapping mapping, QueuePath queuePath) throws IOException {
if (!(queue instanceof LeafQueue)) {
throw new IOException(
"mapping contains invalid or non-leaf queue : " + mapping.getQueue());
}
if (queue instanceof AutoCreatedLeafQueue && queue
.getParent() instanceof ManagedParentQueue) {
QueueMapping newMapping = validateAndGetAutoCreatedQueueMapping(
queueManager, mapping, queuePath);
if (newMapping == null) {
throw new IOException(
"mapping contains invalid or non-leaf queue " + mapping.getQueue());
}
return newMapping;
}
return mapping;
}
private static boolean ifQueueDoesNotExist(CSQueue queue) {
return queue == null;
}
private static QueueMapping validateAndGetAutoCreatedQueueMapping(
CapacitySchedulerQueueManager queueManager, QueueMapping mapping,
QueuePath queuePath) throws IOException {
if (queuePath.hasParentQueue()) {
//if parent queue is specified,
// then it should exist and be an instance of ManagedParentQueue
validateParentQueue(queueManager.getQueue(queuePath.getParentQueue()),
queuePath.getParentQueue(), queuePath.getLeafQueue());
return new QueueMapping(mapping.getType(), mapping.getSource(),
queuePath.getLeafQueue(), queuePath.getParentQueue());
}
return null;
}
private static boolean isStaticQueueMapping(QueueMapping mapping) {
return !mapping.getQueue().contains(
UserGroupMappingPlacementRule.CURRENT_USER_MAPPING) && !mapping
.getQueue().contains(
UserGroupMappingPlacementRule.PRIMARY_GROUP_MAPPING);
}
private static class QueuePath {
public String parentQueue;
public String leafQueue;
public QueuePath(final String leafQueue) {
this.leafQueue = leafQueue;
}
public QueuePath(final String parentQueue, final String leafQueue) {
this.parentQueue = parentQueue;
this.leafQueue = leafQueue;
}
public String getParentQueue() {
return parentQueue;
}
public String getLeafQueue() {
return leafQueue;
}
public boolean hasParentQueue() {
return parentQueue != null;
}
@Override
public String toString() {
return parentQueue + DOT + leafQueue;
}
}
private static QueuePath extractQueuePath(String queueName)
throws IOException {
int parentQueueNameEndIndex = queueName.lastIndexOf(DOT);
if (parentQueueNameEndIndex > -1) {
final String parentQueue = queueName.substring(0, parentQueueNameEndIndex)
.trim();
final String leafQueue = queueName.substring(parentQueueNameEndIndex + 1)
.trim();
return new QueuePath(parentQueue, leafQueue);
}
return new QueuePath(queueName);
}
private static void validateParentQueue(CSQueue parentQueue,
String parentQueueName, String leafQueueName) throws IOException {
if (parentQueue == null) {
throw new IOException(
"mapping contains invalid or non-leaf queue [" + leafQueueName
+ "] and invalid parent queue [" + parentQueueName + "]");
} else if (!(parentQueue instanceof ManagedParentQueue)) {
throw new IOException("mapping contains leaf queue [" + leafQueueName
+ "] and invalid parent queue which "
+ "does not have auto creation of leaf queues enabled ["
+ parentQueueName + "]");
} else if (!parentQueue.getQueueName().equals(parentQueueName)) {
throw new IOException(
"mapping contains invalid or non-leaf queue [" + leafQueueName
+ "] and invalid parent queue "
+ "which does not match existing leaf queue's parent : ["
+ parentQueueName + "] does not match [ " + parentQueue
.getQueueName() + "]");
}
} }
@VisibleForTesting @VisibleForTesting

View File

@ -68,6 +68,7 @@ import org.apache.hadoop.yarn.api.records.YarnApplicationState;
import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.event.Dispatcher; import org.apache.hadoop.yarn.event.Dispatcher;
import org.apache.hadoop.yarn.event.EventHandler; import org.apache.hadoop.yarn.event.EventHandler;
import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.exceptions.YarnRuntimeException; import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
import org.apache.hadoop.yarn.security.AMRMTokenIdentifier; import org.apache.hadoop.yarn.security.AMRMTokenIdentifier;
import org.apache.hadoop.yarn.security.client.ClientToAMTokenIdentifier; import org.apache.hadoop.yarn.security.client.ClientToAMTokenIdentifier;
@ -83,6 +84,8 @@ import org.apache.hadoop.yarn.server.resourcemanager.RMServerUtils;
import org.apache.hadoop.yarn.server.resourcemanager.blacklist.BlacklistManager; import org.apache.hadoop.yarn.server.resourcemanager.blacklist.BlacklistManager;
import org.apache.hadoop.yarn.server.resourcemanager.blacklist.DisabledBlacklistManager; import org.apache.hadoop.yarn.server.resourcemanager.blacklist.DisabledBlacklistManager;
import org.apache.hadoop.yarn.server.resourcemanager.blacklist.SimpleBlacklistManager; import org.apache.hadoop.yarn.server.resourcemanager.blacklist.SimpleBlacklistManager;
import org.apache.hadoop.yarn.server.resourcemanager.placement.ApplicationPlacementContext;
import org.apache.hadoop.yarn.server.resourcemanager.placement.PlacementManager;
import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore.RMState; import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore.RMState;
import org.apache.hadoop.yarn.server.resourcemanager.recovery.Recoverable; import org.apache.hadoop.yarn.server.resourcemanager.recovery.Recoverable;
import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.ApplicationStateData; import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.ApplicationStateData;
@ -158,6 +161,8 @@ public class RMAppImpl implements RMApp, Recoverable {
private boolean isNumAttemptsBeyondThreshold = false; private boolean isNumAttemptsBeyondThreshold = false;
// Mutable fields // Mutable fields
private long startTime; private long startTime;
private long finishTime = 0; private long finishTime = 0;
@ -1073,38 +1078,51 @@ public class RMAppImpl implements RMApp, Recoverable {
app.getUser(), app.getUser(),
BuilderUtils.parseTokensConf(app.submissionContext)); BuilderUtils.parseTokensConf(app.submissionContext));
} catch (Exception e) { } catch (Exception e) {
String msg = "Failed to fetch user credentials from application:" String msg = "Failed to fetch user credentials from application:" + e
+ e.getMessage(); .getMessage();
app.diagnostics.append(msg); app.diagnostics.append(msg);
LOG.error(msg, e); LOG.error(msg, e);
} }
} }
for (Map.Entry<ApplicationTimeoutType, Long> timeout : for (Map.Entry<ApplicationTimeoutType, Long> timeout : app.applicationTimeouts
app.applicationTimeouts.entrySet()) { .entrySet()) {
app.rmContext.getRMAppLifetimeMonitor().registerApp(app.applicationId, app.rmContext.getRMAppLifetimeMonitor().registerApp(app.applicationId,
timeout.getKey(), timeout.getValue()); timeout.getKey(), timeout.getValue());
if (LOG.isDebugEnabled()) { if (LOG.isDebugEnabled()) {
long remainingTime = timeout.getValue() - app.systemClock.getTime(); long remainingTime = timeout.getValue() - app.systemClock.getTime();
LOG.debug("Application " + app.applicationId LOG.debug("Application " + app.applicationId
+ " is registered for timeout monitor, type=" + timeout.getKey() + " is registered for timeout monitor, type=" + timeout.getKey()
+ " remaining timeout=" + " remaining timeout=" + (remainingTime > 0 ?
+ (remainingTime > 0 ? remainingTime / 1000 : 0) + " seconds"); remainingTime / 1000 :
0) + " seconds");
} }
} }
ApplicationPlacementContext placementContext = null;
try {
placementContext = placeApplication(app.rmContext,
app.submissionContext, app.user);
} catch (Exception e) {
String msg = "Failed to place application to queue :" + e.getMessage();
app.diagnostics.append(msg);
LOG.error(msg, e);
}
// No existent attempts means the attempt associated with this app was not // No existent attempts means the attempt associated with this app was not
// started or started but not yet saved. // started or started but not yet saved.
if (app.attempts.isEmpty()) { if (app.attempts.isEmpty()) {
app.scheduler.handle(new AppAddedSchedulerEvent(app.user, app.scheduler.handle(
app.submissionContext, false, app.applicationPriority)); new AppAddedSchedulerEvent(app.user, app.submissionContext, false,
app.applicationPriority, placementContext));
return RMAppState.SUBMITTED; return RMAppState.SUBMITTED;
} }
// Add application to scheduler synchronously to guarantee scheduler // Add application to scheduler synchronously to guarantee scheduler
// knows applications before AM or NM re-registers. // knows applications before AM or NM re-registers.
app.scheduler.handle(new AppAddedSchedulerEvent(app.user, app.scheduler.handle(
app.submissionContext, true, app.applicationPriority)); new AppAddedSchedulerEvent(app.user, app.submissionContext, true,
app.applicationPriority, placementContext));
// recover attempts // recover attempts
app.recoverAppAttempts(); app.recoverAppAttempts();
@ -1120,8 +1138,20 @@ public class RMAppImpl implements RMApp, Recoverable {
RMAppTransition { RMAppTransition {
@Override @Override
public void transition(RMAppImpl app, RMAppEvent event) { public void transition(RMAppImpl app, RMAppEvent event) {
app.handler.handle(new AppAddedSchedulerEvent(app.user, ApplicationPlacementContext placementContext = null;
app.submissionContext, false, app.applicationPriority)); try {
placementContext = placeApplication(app.rmContext,
app.submissionContext, app.user);
replaceQueueFromPlacementContext(placementContext,
app.submissionContext);
} catch (YarnException e) {
String msg = "Failed to place application to queue :" + e.getMessage();
app.diagnostics.append(msg);
LOG.error(msg, e);
}
app.handler.handle(
new AppAddedSchedulerEvent(app.user, app.submissionContext, false,
app.applicationPriority, placementContext));
// send the ATS create Event // send the ATS create Event
app.sendATSCreateEvent(); app.sendATSCreateEvent();
} }
@ -2013,4 +2043,37 @@ public class RMAppImpl implements RMApp, Recoverable {
this.submissionContext.setAMContainerSpec(null); this.submissionContext.setAMContainerSpec(null);
this.submissionContext.setLogAggregationContext(null); this.submissionContext.setLogAggregationContext(null);
} }
@VisibleForTesting
static ApplicationPlacementContext placeApplication(RMContext rmContext,
ApplicationSubmissionContext context, String user) throws YarnException {
ApplicationPlacementContext placementContext = null;
PlacementManager placementManager = rmContext.getQueuePlacementManager();
if (placementManager != null) {
placementContext = placementManager.placeApplication(context, user);
} else{
LOG.error(
"Queue Placement Manager is null. Cannot place application :" + " "
+ context.getApplicationId() + " to queue ");
}
return placementContext;
}
static void replaceQueueFromPlacementContext(
ApplicationPlacementContext placementContext,
ApplicationSubmissionContext context) {
// Set it to ApplicationSubmissionContext
//apply queue mapping only to new application submissions
if (placementContext != null && !StringUtils.equals(context.getQueue(),
placementContext.getQueue())) {
LOG.info("Placed application=" + context.getApplicationId() + " to queue="
+ placementContext.getQueue() + ", original queue=" + context
.getQueue());
context.setQueue(placementContext.getQueue());
}
}
} }

View File

@ -35,31 +35,13 @@ public abstract class AbstractManagedParentQueue extends ParentQueue {
private static final Logger LOG = LoggerFactory.getLogger( private static final Logger LOG = LoggerFactory.getLogger(
AbstractManagedParentQueue.class); AbstractManagedParentQueue.class);
private int maxAppsForAutoCreatedQueues; protected AutoCreatedLeafQueueTemplate leafQueueTemplate;
private int maxAppsPerUserForAutoCreatedQueues;
private int userLimit;
private float userLimitFactor;
public AbstractManagedParentQueue(CapacitySchedulerContext cs, public AbstractManagedParentQueue(CapacitySchedulerContext cs,
String queueName, CSQueue parent, CSQueue old) throws IOException { String queueName, CSQueue parent, CSQueue old) throws IOException {
super(cs, queueName, parent, old); super(cs, queueName, parent, old);
super.setupQueueConfigs(csContext.getClusterResource()); super.setupQueueConfigs(csContext.getClusterResource());
initializeLeafQueueConfigs();
StringBuffer queueInfo = new StringBuffer();
queueInfo.append("Created Managed Parent Queue: ").append(queueName)
.append("\nof type : [" + getClass())
.append("]\nwith capacity: [")
.append(super.getCapacity()).append("]\nwith max capacity: [")
.append(super.getMaximumCapacity()).append("\nwith max apps: [")
.append(getMaxApplicationsForAutoCreatedQueues())
.append("]\nwith max apps per user: [")
.append(getMaxApplicationsPerUserForAutoCreatedQueues())
.append("]\nwith user limit: [").append(getUserLimit())
.append("]\nwith user limit factor: [")
.append(getUserLimitFactor()).append("].");
LOG.info(queueInfo.toString());
} }
@Override @Override
@ -71,8 +53,6 @@ public abstract class AbstractManagedParentQueue extends ParentQueue {
// Set new configs // Set new configs
setupQueueConfigs(clusterResource); setupQueueConfigs(clusterResource);
initializeLeafQueueConfigs();
// run reinitialize on each existing queue, to trigger absolute cap // run reinitialize on each existing queue, to trigger absolute cap
// recomputations // recomputations
for (CSQueue res : this.getChildQueues()) { for (CSQueue res : this.getChildQueues()) {
@ -87,72 +67,29 @@ public abstract class AbstractManagedParentQueue extends ParentQueue {
* Initialize leaf queue configs from template configurations specified on * Initialize leaf queue configs from template configurations specified on
* parent queue. * parent queue.
*/ */
protected void initializeLeafQueueConfigs() { protected AutoCreatedLeafQueueTemplate.Builder initializeLeafQueueConfigs
(String queuePath) {
CapacitySchedulerConfiguration conf = csContext.getConfiguration(); CapacitySchedulerConfiguration conf = csContext.getConfiguration();
final String queuePath = super.getQueuePath(); AutoCreatedLeafQueueTemplate.Builder leafQueueTemplateBuilder = new
AutoCreatedLeafQueueTemplate.Builder();
int maxApps = conf.getMaximumApplicationsPerQueue(queuePath); int maxApps = conf.getMaximumApplicationsPerQueue(queuePath);
if (maxApps < 0) { if (maxApps < 0) {
maxApps = (int) ( maxApps = (int) (
CapacitySchedulerConfiguration.DEFAULT_MAXIMUM_SYSTEM_APPLICATIIONS CapacitySchedulerConfiguration.DEFAULT_MAXIMUM_SYSTEM_APPLICATIIONS
* getAbsoluteCapacity()); * getAbsoluteCapacity());
} }
userLimit = conf.getUserLimit(queuePath);
userLimitFactor = conf.getUserLimitFactor(queuePath);
maxAppsForAutoCreatedQueues = maxApps;
maxAppsPerUserForAutoCreatedQueues =
(int) (maxApps * (userLimit / 100.0f) * userLimitFactor);
} int userLimit = conf.getUserLimit(queuePath);
float userLimitFactor = conf.getUserLimitFactor(queuePath);
leafQueueTemplateBuilder.userLimit(userLimit)
.userLimitFactor(userLimitFactor)
.maxApps(maxApps)
.maxAppsPerUser(
(int) (maxApps * (userLimit / 100.0f) * userLimitFactor));
/** return leafQueueTemplateBuilder;
* Number of maximum applications for each of the auto created leaf queues.
*
* @return maxAppsForAutoCreatedQueues
*/
public int getMaxApplicationsForAutoCreatedQueues() {
return maxAppsForAutoCreatedQueues;
}
/**
* Number of maximum applications per user for each of the auto created
* leaf queues.
*
* @return maxAppsPerUserForAutoCreatedQueues
*/
public int getMaxApplicationsPerUserForAutoCreatedQueues() {
return maxAppsPerUserForAutoCreatedQueues;
}
/**
* User limit value for each of the auto created leaf queues.
*
* @return userLimit
*/
public int getUserLimitForAutoCreatedQueues() {
return userLimit;
}
/**
* User limit factor value for each of the auto created leaf queues.
*
* @return userLimitFactor
*/
public float getUserLimitFactor() {
return userLimitFactor;
}
public int getMaxAppsForAutoCreatedQueues() {
return maxAppsForAutoCreatedQueues;
}
public int getMaxAppsPerUserForAutoCreatedQueues() {
return maxAppsPerUserForAutoCreatedQueues;
}
public int getUserLimit() {
return userLimit;
} }
/** /**
@ -229,4 +166,111 @@ public abstract class AbstractManagedParentQueue extends ParentQueue {
} }
return childQueue; return childQueue;
} }
protected float sumOfChildCapacities() {
try {
writeLock.lock();
float ret = 0;
for (CSQueue l : childQueues) {
ret += l.getCapacity();
}
return ret;
} finally {
writeLock.unlock();
}
}
protected float sumOfChildAbsCapacities() {
try {
writeLock.lock();
float ret = 0;
for (CSQueue l : childQueues) {
ret += l.getAbsoluteCapacity();
}
return ret;
} finally {
writeLock.unlock();
}
}
public static class AutoCreatedLeafQueueTemplate {
private QueueCapacities queueCapacities;
private int maxApps;
private int maxAppsPerUser;
private int userLimit;
private float userLimitFactor;
AutoCreatedLeafQueueTemplate(Builder builder) {
this.maxApps = builder.maxApps;
this.maxAppsPerUser = builder.maxAppsPerUser;
this.userLimit = builder.userLimit;
this.userLimitFactor = builder.userLimitFactor;
this.queueCapacities = builder.queueCapacities;
}
public static class Builder {
private int maxApps;
private int maxAppsPerUser;
private int userLimit;
private float userLimitFactor;
private QueueCapacities queueCapacities;
Builder maxApps(int maxApplications) {
this.maxApps = maxApplications;
return this;
}
Builder maxAppsPerUser(int maxApplicationsPerUser) {
this.maxAppsPerUser = maxApplicationsPerUser;
return this;
}
Builder userLimit(int usrLimit) {
this.userLimit = usrLimit;
return this;
}
Builder userLimitFactor(float ulf) {
this.userLimitFactor = ulf;
return this;
}
Builder capacities(QueueCapacities capacities) {
this.queueCapacities = capacities;
return this;
}
AutoCreatedLeafQueueTemplate build() {
return new AutoCreatedLeafQueueTemplate(this);
}
}
public int getUserLimit() {
return userLimit;
}
public float getUserLimitFactor() {
return userLimitFactor;
}
public QueueCapacities getQueueCapacities() {
return queueCapacities;
}
public int getMaxApps() {
return maxApps;
}
public int getMaxAppsPerUser() {
return maxAppsPerUser;
}
}
public AutoCreatedLeafQueueTemplate getLeafQueueTemplate() {
return leafQueueTemplate;
}
} }

View File

@ -21,6 +21,8 @@ package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity;
import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerDynamicEditException; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerDynamicEditException;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.QueueEntitlement; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.QueueEntitlement;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity
.AbstractManagedParentQueue.AutoCreatedLeafQueueTemplate;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
@ -42,17 +44,18 @@ public class AutoCreatedLeafQueue extends LeafQueue {
AbstractManagedParentQueue parent) throws IOException { AbstractManagedParentQueue parent) throws IOException {
super(cs, queueName, parent, null); super(cs, queueName, parent, null);
updateApplicationAndUserLimits(parent.getUserLimitForAutoCreatedQueues(), AutoCreatedLeafQueueTemplate leafQueueTemplate =
parent.getUserLimitFactor(), parent.getLeafQueueTemplate();
parent.getMaxApplicationsForAutoCreatedQueues(), updateApplicationAndUserLimits(leafQueueTemplate.getUserLimit(),
parent.getMaxApplicationsPerUserForAutoCreatedQueues()); leafQueueTemplate.getUserLimitFactor(),
leafQueueTemplate.getMaxApps(),
leafQueueTemplate.getMaxAppsPerUser());
this.parent = parent; this.parent = parent;
} }
@Override @Override
public void reinitialize(CSQueue newlyParsedQueue, public void reinitialize(CSQueue newlyParsedQueue, Resource clusterResource)
Resource clusterResource) throws IOException { throws IOException {
try { try {
writeLock.lock(); writeLock.lock();
@ -62,10 +65,12 @@ public class AutoCreatedLeafQueue extends LeafQueue {
CSQueueUtils.updateQueueStatistics(resourceCalculator, clusterResource, CSQueueUtils.updateQueueStatistics(resourceCalculator, clusterResource,
this, labelManager, null); this, labelManager, null);
updateApplicationAndUserLimits(parent.getUserLimitForAutoCreatedQueues(), AutoCreatedLeafQueueTemplate leafQueueTemplate =
parent.getUserLimitFactor(), parent.getLeafQueueTemplate();
parent.getMaxApplicationsForAutoCreatedQueues(), updateApplicationAndUserLimits(leafQueueTemplate.getUserLimit(),
parent.getMaxApplicationsPerUserForAutoCreatedQueues()); leafQueueTemplate.getUserLimitFactor(),
leafQueueTemplate.getMaxApps(),
leafQueueTemplate.getMaxAppsPerUser());
} finally { } finally {
writeLock.unlock(); writeLock.unlock();

View File

@ -41,7 +41,6 @@ import org.apache.hadoop.classification.InterfaceStability.Evolving;
import org.apache.hadoop.conf.Configurable; import org.apache.hadoop.conf.Configurable;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.security.AccessControlException; import org.apache.hadoop.security.AccessControlException;
import org.apache.hadoop.security.Groups;
import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.util.Time; import org.apache.hadoop.util.Time;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
@ -64,10 +63,10 @@ import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
import org.apache.hadoop.yarn.proto.YarnServiceProtos.SchedulerResourceTypes; import org.apache.hadoop.yarn.proto.YarnServiceProtos.SchedulerResourceTypes;
import org.apache.hadoop.yarn.server.resourcemanager.RMContext; import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager; import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager;
import org.apache.hadoop.yarn.server.resourcemanager.placement.ApplicationPlacementContext;
import org.apache.hadoop.yarn.server.resourcemanager.placement.PlacementFactory; import org.apache.hadoop.yarn.server.resourcemanager.placement.PlacementFactory;
import org.apache.hadoop.yarn.server.resourcemanager.placement.PlacementRule; import org.apache.hadoop.yarn.server.resourcemanager.placement.PlacementRule;
import org.apache.hadoop.yarn.server.resourcemanager.placement.UserGroupMappingPlacementRule; import org.apache.hadoop.yarn.server.resourcemanager.placement.UserGroupMappingPlacementRule;
import org.apache.hadoop.yarn.server.resourcemanager.placement.UserGroupMappingPlacementRule.QueueMapping;
import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore.RMState; import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore.RMState;
import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.ApplicationStateData; import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.ApplicationStateData;
import org.apache.hadoop.yarn.server.resourcemanager.reservation.ReservationConstants; import org.apache.hadoop.yarn.server.resourcemanager.reservation.ReservationConstants;
@ -146,6 +145,8 @@ import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions; import com.google.common.base.Preconditions;
import com.google.common.util.concurrent.SettableFuture; import com.google.common.util.concurrent.SettableFuture;
import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerConfiguration.QUEUE_MAPPING;
@LimitedPrivate("yarn") @LimitedPrivate("yarn")
@Evolving @Evolving
@SuppressWarnings("unchecked") @SuppressWarnings("unchecked")
@ -560,44 +561,17 @@ public class CapacityScheduler extends
} }
@VisibleForTesting @VisibleForTesting
public UserGroupMappingPlacementRule public PlacementRule getUserGroupMappingPlacementRule() throws IOException {
getUserGroupMappingPlacementRule() throws IOException {
try { try {
readLock.lock(); readLock.lock();
boolean overrideWithQueueMappings = conf.getOverrideWithQueueMappings(); return UserGroupMappingPlacementRule.get(this);
LOG.info(
"Initialized queue mappings, override: " + overrideWithQueueMappings);
// Get new user/group mappings
List<QueueMapping> newMappings = conf.getQueueMappings();
// check if mappings refer to valid queues
for (QueueMapping mapping : newMappings) {
String mappingQueue = mapping.getQueue();
if (!mappingQueue.equals(
UserGroupMappingPlacementRule.CURRENT_USER_MAPPING) && !mappingQueue
.equals(UserGroupMappingPlacementRule.PRIMARY_GROUP_MAPPING)) {
CSQueue queue = getQueue(mappingQueue);
if (queue == null || !(queue instanceof LeafQueue)) {
throw new IOException(
"mapping contains invalid or non-leaf queue " + mappingQueue);
}
}
}
// initialize groups if mappings are present
if (newMappings.size() > 0) {
Groups groups = new Groups(conf);
return new UserGroupMappingPlacementRule(overrideWithQueueMappings,
newMappings, groups);
}
return null;
} finally { } finally {
readLock.unlock(); readLock.unlock();
} }
} }
private void updatePlacementRules() throws IOException { @VisibleForTesting
void updatePlacementRules() throws IOException {
// Initialize placement rules // Initialize placement rules
Collection<String> placementRuleStrs = conf.getStringCollection( Collection<String> placementRuleStrs = conf.getStringCollection(
YarnConfiguration.QUEUE_PLACEMENT_RULES); YarnConfiguration.QUEUE_PLACEMENT_RULES);
@ -731,37 +705,92 @@ public class CapacityScheduler extends
} }
} }
private void addApplication(ApplicationId applicationId, private void addApplication(ApplicationId applicationId, String queueName,
String queueName, String user, Priority priority) { String user, Priority priority,
ApplicationPlacementContext placementContext) {
try { try {
writeLock.lock(); writeLock.lock();
if (isSystemAppsLimitReached()) { if (isSystemAppsLimitReached()) {
String message = "Maximum system application limit reached," String message = "Maximum system application limit reached,"
+ "cannot accept submission of application: " + applicationId; + "cannot accept submission of application: " + applicationId;
this.rmContext.getDispatcher().getEventHandler().handle(new RMAppEvent( this.rmContext.getDispatcher().getEventHandler().handle(
applicationId, RMAppEventType.APP_REJECTED, message)); new RMAppEvent(applicationId, RMAppEventType.APP_REJECTED,
message));
return; return;
} }
// Sanity checks. // Sanity checks.
CSQueue queue = getQueue(queueName); CSQueue queue = getQueue(queueName);
if (queue == null && placementContext != null) {
//Could be a potential auto-created leaf queue
try {
queue = autoCreateLeafQueue(placementContext);
} catch (YarnException | IOException e) {
LOG.error("Could not auto-create leaf queue due to : ", e);
final String message =
"Application " + applicationId + " submission by user : " + user
+ " to queue : " + queueName + " failed : " + e.getMessage();
this.rmContext.getDispatcher().getEventHandler().handle(
new RMAppEvent(applicationId, RMAppEventType.APP_REJECTED,
message));
}
}
if (queue == null) { if (queue == null) {
String message = final String message =
"Application " + applicationId + " submitted by user " + user "Application " + applicationId + " submitted by user " + user
+ " to unknown queue: " + queueName; + " to unknown queue: " + queueName;
this.rmContext.getDispatcher().getEventHandler().handle( this.rmContext.getDispatcher().getEventHandler().handle(
new RMAppEvent(applicationId, RMAppEventType.APP_REJECTED, new RMAppEvent(applicationId, RMAppEventType.APP_REJECTED,
message)); message));
return; return;
} }
if (!(queue instanceof LeafQueue)) { if (!(queue instanceof LeafQueue)) {
String message = String message =
"Application " + applicationId + " submitted by user " + user "Application " + applicationId + " submitted by user : " + user
+ " to non-leaf queue: " + queueName; + " to non-leaf queue : " + queueName;
this.rmContext.getDispatcher().getEventHandler().handle(
new RMAppEvent(applicationId, RMAppEventType.APP_REJECTED,
message));
return;
} else if (queue instanceof AutoCreatedLeafQueue && queue
.getParent() instanceof ManagedParentQueue) {
//If queue already exists and auto-queue creation was not required,
//placement context should not be null
if (placementContext == null) {
String message =
"Application " + applicationId + " submission by user : " + user
+ " to specified queue : " + queueName + " is prohibited. "
+ "Verify automatic queue mapping for user exists in " +
QUEUE_MAPPING;
this.rmContext.getDispatcher().getEventHandler().handle(
new RMAppEvent(applicationId, RMAppEventType.APP_REJECTED,
message));
return;
// For a queue which exists already and
// not auto-created above, then its parent queue should match
// the parent queue specified in queue mapping
} else if (!queue.getParent().getQueueName().equals(
placementContext.getParentQueue())) {
String message =
"Auto created Leaf queue " + placementContext.getQueue() + " "
+ "already exists under queue : " + queue
.getParent().getQueuePath()
+ ".But Queue mapping configuration " +
CapacitySchedulerConfiguration.QUEUE_MAPPING + " has been "
+ "updated to a different parent queue : "
+ placementContext.getParentQueue()
+ " for the specified user : " + user;
this.rmContext.getDispatcher().getEventHandler().handle( this.rmContext.getDispatcher().getEventHandler().handle(
new RMAppEvent(applicationId, RMAppEventType.APP_REJECTED, new RMAppEvent(applicationId, RMAppEventType.APP_REJECTED,
message)); message));
return; return;
} }
}
// Submit to the queue // Submit to the queue
try { try {
queue.submitApplication(applicationId, user, queueName); queue.submitApplication(applicationId, user, queueName);
@ -1483,7 +1512,8 @@ public class CapacityScheduler extends
if (queueName != null) { if (queueName != null) {
if (!appAddedEvent.getIsAppRecovering()) { if (!appAddedEvent.getIsAppRecovering()) {
addApplication(appAddedEvent.getApplicationId(), queueName, addApplication(appAddedEvent.getApplicationId(), queueName,
appAddedEvent.getUser(), appAddedEvent.getApplicatonPriority()); appAddedEvent.getUser(), appAddedEvent.getApplicatonPriority(),
appAddedEvent.getPlacementContext());
} else { } else {
addApplicationOnRecovery(appAddedEvent.getApplicationId(), queueName, addApplicationOnRecovery(appAddedEvent.getApplicationId(), queueName,
appAddedEvent.getUser(), appAddedEvent.getApplicatonPriority()); appAddedEvent.getUser(), appAddedEvent.getApplicatonPriority());
@ -2001,7 +2031,8 @@ public class CapacityScheduler extends
try { try {
writeLock.lock(); writeLock.lock();
LeafQueue queue = this.queueManager.getAndCheckLeafQueue(inQueue); LeafQueue queue = this.queueManager.getAndCheckLeafQueue(inQueue);
ParentQueue parent = (ParentQueue) queue.getParent(); AbstractManagedParentQueue parent = (AbstractManagedParentQueue) queue
.getParent();
if (!(queue instanceof AutoCreatedLeafQueue)) { if (!(queue instanceof AutoCreatedLeafQueue)) {
throw new SchedulerDynamicEditException( throw new SchedulerDynamicEditException(
@ -2010,7 +2041,8 @@ public class CapacityScheduler extends
} }
if (parent == null if (parent == null
|| !(AbstractManagedParentQueue.class.isAssignableFrom(parent.getClass()))) { || !(AbstractManagedParentQueue.class.isAssignableFrom(
parent.getClass()))) {
throw new SchedulerDynamicEditException( throw new SchedulerDynamicEditException(
"The parent of AutoCreatedLeafQueue " + inQueue "The parent of AutoCreatedLeafQueue " + inQueue
+ " must be a PlanQueue/ManagedParentQueue"); + " must be a PlanQueue/ManagedParentQueue");
@ -2655,4 +2687,43 @@ public class CapacityScheduler extends
} }
return null; return null;
} }
private LeafQueue autoCreateLeafQueue(
ApplicationPlacementContext placementContext)
throws IOException, YarnException {
AutoCreatedLeafQueue autoCreatedLeafQueue = null;
String leafQueueName = placementContext.getQueue();
String parentQueueName = placementContext.getParentQueue();
if (!StringUtils.isEmpty(parentQueueName)) {
CSQueue parentQueue = getQueue(parentQueueName);
if (parentQueue != null && conf.isAutoCreateChildQueueEnabled(
parentQueue.getQueuePath())) {
ManagedParentQueue autoCreateEnabledParentQueue =
(ManagedParentQueue) parentQueue;
autoCreatedLeafQueue = new AutoCreatedLeafQueue(this, leafQueueName,
autoCreateEnabledParentQueue);
addQueue(autoCreatedLeafQueue);
//TODO - Set entitlement through capacity management policy
} else{
throw new SchedulerDynamicEditException(
"Could not auto-create leaf queue for " + leafQueueName
+ ". Queue mapping specifies an invalid parent queue "
+ "which does not exist "
+ parentQueueName);
}
} else{
throw new SchedulerDynamicEditException(
"Could not auto-create leaf queue for " + leafQueueName
+ ". Queue mapping does not specify"
+ " which parent queue it needs to be created under.");
}
return autoCreatedLeafQueue;
}
} }

View File

@ -907,6 +907,12 @@ public class CapacitySchedulerConfiguration extends ReservationSchedulerConfigur
DEFAULT_ENABLE_QUEUE_MAPPING_OVERRIDE); DEFAULT_ENABLE_QUEUE_MAPPING_OVERRIDE);
} }
@Private
@VisibleForTesting
public void setOverrideWithQueueMappings(boolean overrideWithQueueMappings) {
setBoolean(ENABLE_QUEUE_MAPPING_OVERRIDE, overrideWithQueueMappings);
}
/** /**
* Returns a collection of strings, trimming leading and trailing whitespeace * Returns a collection of strings, trimming leading and trailing whitespeace
* on each value * on each value
@ -981,6 +987,31 @@ public class CapacitySchedulerConfiguration extends ReservationSchedulerConfigur
return mappings; return mappings;
} }
@Private
@VisibleForTesting
public void setQueuePlacementRules(Collection<String> queuePlacementRules) {
if (queuePlacementRules == null) {
return;
}
String str = StringUtils.join(",", queuePlacementRules);
setStrings(YarnConfiguration.QUEUE_PLACEMENT_RULES, str);
}
@Private
@VisibleForTesting
public void setQueueMappings(List<QueueMapping> queueMappings) {
if (queueMappings == null) {
return;
}
List<String> queueMappingStrs = new ArrayList<>();
for (QueueMapping mapping : queueMappings) {
queueMappingStrs.add(mapping.toString());
}
setStrings(QUEUE_MAPPING, StringUtils.join(",", queueMappingStrs));
}
public boolean isReservable(String queue) { public boolean isReservable(String queue) {
boolean isReservable = boolean isReservable =
getBoolean(getQueuePrefix(queue) + IS_RESERVABLE, false); getBoolean(getQueuePrefix(queue) + IS_RESERVABLE, false);
@ -1523,4 +1554,126 @@ public class CapacitySchedulerConfiguration extends ReservationSchedulerConfigur
public void setDefaultLifetimePerQueue(String queue, long defaultLifetime) { public void setDefaultLifetimePerQueue(String queue, long defaultLifetime) {
setLong(getQueuePrefix(queue) + DEFAULT_LIFETIME_SUFFIX, defaultLifetime); setLong(getQueuePrefix(queue) + DEFAULT_LIFETIME_SUFFIX, defaultLifetime);
} }
@Private
public static final boolean DEFAULT_AUTO_CREATE_CHILD_QUEUE_ENABLED = false;
@Private
public static final String AUTO_CREATE_CHILD_QUEUE_ENABLED =
"auto-create-child-queue.enabled";
@Private
public static final String AUTO_CREATED_LEAF_QUEUE_TEMPLATE_PREFIX =
"leaf-queue-template";
@Private
public static final String AUTO_CREATE_QUEUE_MAX_QUEUES =
"auto-create-child-queue.max-queues";
@Private
public static final int DEFAULT_AUTO_CREATE_QUEUE_MAX_QUEUES = 1000;
/**
* If true, this queue will be created as a Parent Queue which Auto Created
* leaf child queues
*
* @param queuePath The queues path
* @return true if auto create is enabled for child queues else false. Default
* is false
*/
@Private
public boolean isAutoCreateChildQueueEnabled(String queuePath) {
boolean isAutoCreateEnabled = getBoolean(
getQueuePrefix(queuePath) + AUTO_CREATE_CHILD_QUEUE_ENABLED,
DEFAULT_AUTO_CREATE_CHILD_QUEUE_ENABLED);
return isAutoCreateEnabled;
}
@Private
@VisibleForTesting
public void setAutoCreateChildQueueEnabled(String queuePath,
boolean autoCreationEnabled) {
setBoolean(getQueuePrefix(queuePath) +
AUTO_CREATE_CHILD_QUEUE_ENABLED,
autoCreationEnabled);
}
/**
* Get the auto created leaf queue's template configuration prefix
* Leaf queue's template capacities are configured at the parent queue
*
* @param queuePath parent queue's path
* @return Config prefix for leaf queue template configurations
*/
@Private
public String getAutoCreatedQueueTemplateConfPrefix(String queuePath) {
return queuePath + DOT + AUTO_CREATED_LEAF_QUEUE_TEMPLATE_PREFIX;
}
@Private
public static final String FAIL_AUTO_CREATION_ON_EXCEEDING_CAPACITY =
"auto-create-child-queue.fail-on-exceeding-parent-capacity";
@Private
public static final boolean DEFAULT_FAIL_AUTO_CREATION_ON_EXCEEDING_CAPACITY =
false;
/**
* Fail further auto leaf queue creation when parent's guaranteed capacity is
* exceeded.
*
* @param queuePath the parent queue's path
* @return true if configured to fail else false
*/
@Private
public boolean getShouldFailAutoQueueCreationWhenGuaranteedCapacityExceeded(
String queuePath) {
boolean shouldFailAutoQueueCreationOnExceedingGuaranteedCapacity =
getBoolean(getQueuePrefix(queuePath)
+ FAIL_AUTO_CREATION_ON_EXCEEDING_CAPACITY,
DEFAULT_FAIL_AUTO_CREATION_ON_EXCEEDING_CAPACITY);
return shouldFailAutoQueueCreationOnExceedingGuaranteedCapacity;
}
@VisibleForTesting
@Private
public void setShouldFailAutoQueueCreationWhenGuaranteedCapacityExceeded(
String queuePath, boolean autoCreationEnabled) {
setBoolean(
getQueuePrefix(queuePath) +
FAIL_AUTO_CREATION_ON_EXCEEDING_CAPACITY,
autoCreationEnabled);
}
/**
* Get the max number of leaf queues that are allowed to be created under
* a parent queue
*
* @param queuePath the paret queue's path
* @return the max number of leaf queues allowed to be auto created
*/
@Private
public int getAutoCreatedQueuesMaxChildQueuesLimit(String queuePath) {
return getInt(getQueuePrefix(queuePath) +
AUTO_CREATE_QUEUE_MAX_QUEUES,
DEFAULT_AUTO_CREATE_QUEUE_MAX_QUEUES);
}
@Private
@VisibleForTesting
public void setAutoCreatedLeafQueueTemplateCapacity(String queuePath,
float val) {
String leafQueueConfPrefix = getAutoCreatedQueueTemplateConfPrefix(
queuePath);
setCapacity(leafQueueConfPrefix, val);
}
@Private
@VisibleForTesting
public void setAutoCreatedLeafQueueTemplateMaxCapacity(String queuePath,
float val) {
String leafQueueConfPrefix = getAutoCreatedQueueTemplateConfPrefix(
queuePath);
setMaximumCapacity(leafQueueConfPrefix, val);
}
} }

View File

@ -176,7 +176,7 @@ public class CapacitySchedulerQueueManager implements SchedulerQueueManager<
if (!csContext.isConfigurationMutable() || if (!csContext.isConfigurationMutable() ||
csContext.getRMContext().getHAServiceState() csContext.getRMContext().getHAServiceState()
!= HAServiceProtocol.HAServiceState.STANDBY) { != HAServiceProtocol.HAServiceState.STANDBY) {
// Ensure queue hiearchy in the new XML file is proper. // Ensure queue hierarchy in the new XML file is proper.
validateQueueHierarchy(queues, newQueues); validateQueueHierarchy(queues, newQueues);
} }
@ -216,11 +216,13 @@ public class CapacitySchedulerQueueManager implements SchedulerQueueManager<
Map<String, CSQueue> oldQueues, Map<String, CSQueue> oldQueues,
QueueHook hook) throws IOException { QueueHook hook) throws IOException {
CSQueue queue; CSQueue queue;
String fullQueueName = String fullQueueName = (parent == null) ?
(parent == null) ? queueName queueName :
: (parent.getQueuePath() + "." + queueName); (parent.getQueuePath() + "." + queueName);
String[] childQueueNames = conf.getQueues(fullQueueName); String[] childQueueNames = conf.getQueues(fullQueueName);
boolean isReservableQueue = conf.isReservable(fullQueueName); boolean isReservableQueue = conf.isReservable(fullQueueName);
boolean isAutoCreateEnabled = conf.isAutoCreateChildQueueEnabled(
fullQueueName);
if (childQueueNames == null || childQueueNames.length == 0) { if (childQueueNames == null || childQueueNames.length == 0) {
if (null == parent) { if (null == parent) {
throw new IllegalStateException( throw new IllegalStateException(
@ -229,8 +231,7 @@ public class CapacitySchedulerQueueManager implements SchedulerQueueManager<
// Check if the queue will be dynamically managed by the Reservation // Check if the queue will be dynamically managed by the Reservation
// system // system
if (isReservableQueue) { if (isReservableQueue) {
queue = queue = new PlanQueue(csContext, queueName, parent,
new PlanQueue(csContext, queueName, parent,
oldQueues.get(queueName)); oldQueues.get(queueName));
//initializing the "internal" default queue, for SLS compatibility //initializing the "internal" default queue, for SLS compatibility
@ -249,38 +250,46 @@ public class CapacitySchedulerQueueManager implements SchedulerQueueManager<
((PlanQueue) queue).setChildQueues(childQueues); ((PlanQueue) queue).setChildQueues(childQueues);
queues.put(defReservationId, resQueue); queues.put(defReservationId, resQueue);
} else { } else if (isAutoCreateEnabled) {
queue = queue = new ManagedParentQueue(csContext, queueName, parent,
new LeafQueue(csContext, queueName, parent,
oldQueues.get(queueName)); oldQueues.get(queueName));
} else{
queue = new LeafQueue(csContext, queueName, parent,
oldQueues.get(queueName));
// Used only for unit tests // Used only for unit tests
queue = hook.hook(queue); queue = hook.hook(queue);
} }
} else { } else{
if (isReservableQueue) { if (isReservableQueue) {
throw new IllegalStateException( throw new IllegalStateException(
"Only Leaf Queues can be reservable for " + queueName); "Only Leaf Queues can be reservable for " + queueName);
} }
ParentQueue parentQueue =
new ParentQueue(csContext, queueName, parent, ParentQueue parentQueue;
if (isAutoCreateEnabled) {
parentQueue = new ManagedParentQueue(csContext, queueName, parent,
oldQueues.get(queueName)); oldQueues.get(queueName));
} else{
parentQueue = new ParentQueue(csContext, queueName, parent,
oldQueues.get(queueName));
}
// Used only for unit tests // Used only for unit tests
queue = hook.hook(parentQueue); queue = hook.hook(parentQueue);
List<CSQueue> childQueues = new ArrayList<>(); List<CSQueue> childQueues = new ArrayList<>();
for (String childQueueName : childQueueNames) { for (String childQueueName : childQueueNames) {
CSQueue childQueue = CSQueue childQueue = parseQueue(csContext, conf, queue, childQueueName,
parseQueue(csContext, conf, queue, childQueueName,
queues, oldQueues, hook); queues, oldQueues, hook);
childQueues.add(childQueue); childQueues.add(childQueue);
} }
parentQueue.setChildQueues(childQueues); parentQueue.setChildQueues(childQueues);
} }
if (queue instanceof LeafQueue && queues.containsKey(queueName) if (queue instanceof LeafQueue && queues.containsKey(queueName) && queues
&& queues.get(queueName) instanceof LeafQueue) { .get(queueName) instanceof LeafQueue) {
throw new IOException("Two leaf queues were named " + queueName throw new IOException("Two leaf queues were named " + queueName
+ ". Leaf queue names must be distinct"); + ". Leaf queue names must be distinct");
} }
@ -312,25 +321,44 @@ public class CapacitySchedulerQueueManager implements SchedulerQueueManager<
if (oldQueue.getState() == QueueState.STOPPED) { if (oldQueue.getState() == QueueState.STOPPED) {
LOG.info("Deleting Queue " + queueName + ", as it is not" LOG.info("Deleting Queue " + queueName + ", as it is not"
+ " present in the modified capacity configuration xml"); + " present in the modified capacity configuration xml");
} else { } else{
throw new IOException(oldQueue.getQueuePath() + " is deleted from" throw new IOException(oldQueue.getQueuePath() + " is deleted from"
+ " the new capacity scheduler configuration, but the" + " the new capacity scheduler configuration, but the"
+ " queue is not yet in stopped state. " + " queue is not yet in stopped state. " + "Current State : "
+ "Current State : " + oldQueue.getState()); + oldQueue.getState());
} }
} else if (!oldQueue.getQueuePath().equals(newQueue.getQueuePath())) { } else if (!oldQueue.getQueuePath().equals(newQueue.getQueuePath())) {
//Queue's cannot be moved from one hierarchy to other //Queue's cannot be moved from one hierarchy to other
throw new IOException(queueName + " is moved from:" throw new IOException(
+ oldQueue.getQueuePath() + " to:" + newQueue.getQueuePath() queueName + " is moved from:" + oldQueue.getQueuePath() + " to:"
+ newQueue.getQueuePath()
+ " after refresh, which is not allowed."); + " after refresh, which is not allowed.");
} else if (oldQueue instanceof ParentQueue
&& !(oldQueue instanceof ManagedParentQueue)
&& newQueue instanceof ManagedParentQueue) {
throw new IOException(
"Can not convert parent queue: " + oldQueue.getQueuePath()
+ " to auto create enabled parent queue since "
+ "it could have other pre-configured queues which is not "
+ "supported");
} else if (oldQueue instanceof ManagedParentQueue
&& !(newQueue instanceof ManagedParentQueue)) {
throw new IOException(
"Cannot convert auto create enabled parent queue: " + oldQueue
.getQueuePath() + " to leaf queue. Please check "
+ " parent queue's configuration "
+ CapacitySchedulerConfiguration
.AUTO_CREATE_CHILD_QUEUE_ENABLED
+ " is set to true");
} else if (oldQueue instanceof LeafQueue } else if (oldQueue instanceof LeafQueue
&& newQueue instanceof ParentQueue) { && newQueue instanceof ParentQueue) {
if (oldQueue.getState() == QueueState.STOPPED) { if (oldQueue.getState() == QueueState.STOPPED) {
LOG.info("Converting the leaf queue: " + oldQueue.getQueuePath() LOG.info("Converting the leaf queue: " + oldQueue.getQueuePath()
+ " to parent queue."); + " to parent queue.");
} else { } else{
throw new IOException("Can not convert the leaf queue: " throw new IOException(
+ oldQueue.getQueuePath() + " to parent queue since " "Can not convert the leaf queue: " + oldQueue.getQueuePath()
+ " to parent queue since "
+ "it is not yet in stopped state. Current State : " + "it is not yet in stopped state. Current State : "
+ oldQueue.getState()); + oldQueue.getState());
} }
@ -352,6 +380,7 @@ public class CapacitySchedulerQueueManager implements SchedulerQueueManager<
*/ */
private void updateQueues(Map<String, CSQueue> existingQueues, private void updateQueues(Map<String, CSQueue> existingQueues,
Map<String, CSQueue> newQueues) { Map<String, CSQueue> newQueues) {
CapacitySchedulerConfiguration conf = csContext.getConfiguration();
for (Map.Entry<String, CSQueue> e : newQueues.entrySet()) { for (Map.Entry<String, CSQueue> e : newQueues.entrySet()) {
String queueName = e.getKey(); String queueName = e.getKey();
CSQueue queue = e.getValue(); CSQueue queue = e.getValue();
@ -363,7 +392,13 @@ public class CapacitySchedulerQueueManager implements SchedulerQueueManager<
.iterator(); itr.hasNext();) { .iterator(); itr.hasNext();) {
Map.Entry<String, CSQueue> e = itr.next(); Map.Entry<String, CSQueue> e = itr.next();
String queueName = e.getKey(); String queueName = e.getKey();
if (!newQueues.containsKey(queueName)) { CSQueue existingQueue = e.getValue();
//TODO - Handle case when auto create is disabled on parent queues
if (!newQueues.containsKey(queueName) && !(
existingQueue instanceof AutoCreatedLeafQueue && conf
.isAutoCreateChildQueueEnabled(
existingQueue.getParent().getQueuePath()))) {
itr.remove(); itr.remove();
} }
} }

View File

@ -0,0 +1,158 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler
.SchedulerDynamicEditException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
/**
* Auto Creation enabled Parent queue. This queue initially does not have any
* children to start with and all child
* leaf queues will be auto created. Currently this does not allow other
* pre-configured leaf or parent queues to
* co-exist along with auto-created leaf queues. The auto creation is limited
* to leaf queues currently.
*/
public class ManagedParentQueue extends AbstractManagedParentQueue {
private boolean shouldFailAutoCreationWhenGuaranteedCapacityExceeded = false;
private static final Logger LOG = LoggerFactory.getLogger(
ManagedParentQueue.class);
public ManagedParentQueue(final CapacitySchedulerContext cs,
final String queueName, final CSQueue parent, final CSQueue old)
throws IOException {
super(cs, queueName, parent, old);
String leafQueueTemplateConfPrefix = getLeafQueueConfigPrefix(
csContext.getConfiguration());
this.leafQueueTemplate = initializeLeafQueueConfigs(
leafQueueTemplateConfPrefix).build();
StringBuffer queueInfo = new StringBuffer();
queueInfo.append("Created Managed Parent Queue: ").append(queueName).append(
"]\nwith capacity: [").append(super.getCapacity()).append(
"]\nwith max capacity: [").append(super.getMaximumCapacity()).append(
"\nwith max apps: [").append(leafQueueTemplate.getMaxApps()).append(
"]\nwith max apps per user: [").append(
leafQueueTemplate.getMaxAppsPerUser()).append("]\nwith user limit: [")
.append(leafQueueTemplate.getUserLimit()).append(
"]\nwith user limit factor: [").append(
leafQueueTemplate.getUserLimitFactor()).append("].");
LOG.info(queueInfo.toString());
}
@Override
public void reinitialize(CSQueue newlyParsedQueue, Resource clusterResource)
throws IOException {
validate(newlyParsedQueue);
super.reinitialize(newlyParsedQueue, clusterResource);
String leafQueueTemplateConfPrefix = getLeafQueueConfigPrefix(
csContext.getConfiguration());
this.leafQueueTemplate = initializeLeafQueueConfigs(
leafQueueTemplateConfPrefix).build();
}
@Override
protected AutoCreatedLeafQueueTemplate.Builder initializeLeafQueueConfigs(
String queuePath) {
AutoCreatedLeafQueueTemplate.Builder leafQueueTemplate =
super.initializeLeafQueueConfigs(queuePath);
CapacitySchedulerConfiguration conf = csContext.getConfiguration();
String leafQueueTemplateConfPrefix = getLeafQueueConfigPrefix(conf);
QueueCapacities queueCapacities = new QueueCapacities(false);
CSQueueUtils.loadUpdateAndCheckCapacities(leafQueueTemplateConfPrefix,
csContext.getConfiguration(), queueCapacities, getQueueCapacities());
leafQueueTemplate.capacities(queueCapacities);
shouldFailAutoCreationWhenGuaranteedCapacityExceeded =
conf.getShouldFailAutoQueueCreationWhenGuaranteedCapacityExceeded(
getQueuePath());
return leafQueueTemplate;
}
protected void validate(final CSQueue newlyParsedQueue) throws IOException {
// Sanity check
if (!(newlyParsedQueue instanceof ManagedParentQueue) || !newlyParsedQueue
.getQueuePath().equals(getQueuePath())) {
throw new IOException(
"Trying to reinitialize " + getQueuePath() + " from "
+ newlyParsedQueue.getQueuePath());
}
}
@Override
public void addChildQueue(CSQueue childQueue)
throws SchedulerDynamicEditException {
try {
writeLock.lock();
if (childQueue == null || !(childQueue instanceof AutoCreatedLeafQueue)) {
throw new SchedulerDynamicEditException(
"Expected child queue to be an instance of AutoCreatedLeafQueue");
}
CapacitySchedulerConfiguration conf = csContext.getConfiguration();
ManagedParentQueue parentQueue =
(ManagedParentQueue) childQueue.getParent();
String leafQueueName = childQueue.getQueueName();
int maxQueues = conf.getAutoCreatedQueuesMaxChildQueuesLimit(
parentQueue.getQueuePath());
if (parentQueue.getChildQueues().size() >= maxQueues) {
throw new SchedulerDynamicEditException(
"Cannot auto create leaf queue " + leafQueueName + ".Max Child "
+ "Queue limit exceeded which is configured as : " + maxQueues
+ " and number of child queues is : " + parentQueue
.getChildQueues().size());
}
if (shouldFailAutoCreationWhenGuaranteedCapacityExceeded) {
if (getLeafQueueTemplate().getQueueCapacities().getAbsoluteCapacity()
+ parentQueue.sumOfChildAbsCapacities() > parentQueue
.getAbsoluteCapacity()) {
throw new SchedulerDynamicEditException(
"Cannot auto create leaf queue " + leafQueueName + ". Child "
+ "queues capacities have reached parent queue : "
+ parentQueue.getQueuePath() + " guaranteed capacity");
}
}
AutoCreatedLeafQueue leafQueue = (AutoCreatedLeafQueue) childQueue;
super.addChildQueue(leafQueue);
//TODO - refresh policy queue after capacity management is added
} finally {
writeLock.unlock();
}
}
private String getLeafQueueConfigPrefix(CapacitySchedulerConfiguration conf) {
return conf.getAutoCreatedQueueTemplateConfPrefix(getQueuePath());
}
}

View File

@ -1081,17 +1081,4 @@ public class ParentQueue extends AbstractCSQueue {
public QueueOrderingPolicy getQueueOrderingPolicy() { public QueueOrderingPolicy getQueueOrderingPolicy() {
return queueOrderingPolicy; return queueOrderingPolicy;
} }
protected float sumOfChildCapacities() {
try {
writeLock.lock();
float ret = 0;
for (CSQueue l : childQueues) {
ret += l.getCapacity();
}
return ret;
} finally {
writeLock.unlock();
}
}
} }

View File

@ -40,6 +40,19 @@ public class PlanQueue extends AbstractManagedParentQueue {
public PlanQueue(CapacitySchedulerContext cs, String queueName, public PlanQueue(CapacitySchedulerContext cs, String queueName,
CSQueue parent, CSQueue old) throws IOException { CSQueue parent, CSQueue old) throws IOException {
super(cs, queueName, parent, old); super(cs, queueName, parent, old);
this.leafQueueTemplate = initializeLeafQueueConfigs(getQueuePath()).build();
StringBuffer queueInfo = new StringBuffer();
queueInfo.append("Created Plan Queue: ").append(queueName).append(
"]\nwith capacity: [").append(super.getCapacity()).append(
"]\nwith max capacity: [").append(super.getMaximumCapacity()).append(
"\nwith max apps: [").append(leafQueueTemplate.getMaxApps()).append(
"]\nwith max apps per user: [").append(
leafQueueTemplate.getMaxAppsPerUser()).append("]\nwith user limit: [")
.append(leafQueueTemplate.getUserLimit()).append(
"]\nwith user limit factor: [").append(
leafQueueTemplate.getUserLimitFactor()).append("].");
LOG.info(queueInfo.toString());
} }
@Override @Override
@ -47,17 +60,21 @@ public class PlanQueue extends AbstractManagedParentQueue {
throws IOException { throws IOException {
validate(newlyParsedQueue); validate(newlyParsedQueue);
super.reinitialize(newlyParsedQueue, clusterResource); super.reinitialize(newlyParsedQueue, clusterResource);
this.leafQueueTemplate = initializeLeafQueueConfigs(getQueuePath()).build();
} }
@Override @Override
protected void initializeLeafQueueConfigs() { protected AutoCreatedLeafQueueTemplate.Builder initializeLeafQueueConfigs
String queuePath = super.getQueuePath(); (String queuePath) {
AutoCreatedLeafQueueTemplate.Builder leafQueueTemplate = super
.initializeLeafQueueConfigs
(queuePath);
showReservationsAsQueues = csContext.getConfiguration() showReservationsAsQueues = csContext.getConfiguration()
.getShowReservationAsQueues(queuePath); .getShowReservationAsQueues(queuePath);
super.initializeLeafQueueConfigs(); return leafQueueTemplate;
} }
private void validate(final CSQueue newlyParsedQueue) throws IOException { protected void validate(final CSQueue newlyParsedQueue) throws IOException {
// Sanity check // Sanity check
if (!(newlyParsedQueue instanceof PlanQueue) || !newlyParsedQueue if (!(newlyParsedQueue instanceof PlanQueue) || !newlyParsedQueue
.getQueuePath().equals(getQueuePath())) { .getQueuePath().equals(getQueuePath())) {

View File

@ -6,9 +6,9 @@
* to you under the Apache License, Version 2.0 (the * to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance * "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at * with the License. You may obtain a copy of the License at
* * <p>
* http://www.apache.org/licenses/LICENSE-2.0 * http://www.apache.org/licenses/LICENSE-2.0
* * <p>
* Unless required by applicable law or agreed to in writing, software * Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, * distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
@ -22,6 +22,8 @@ import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext; import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
import org.apache.hadoop.yarn.api.records.Priority; import org.apache.hadoop.yarn.api.records.Priority;
import org.apache.hadoop.yarn.api.records.ReservationId; import org.apache.hadoop.yarn.api.records.ReservationId;
import org.apache.hadoop.yarn.server.resourcemanager.placement
.ApplicationPlacementContext;
public class AppAddedSchedulerEvent extends SchedulerEvent { public class AppAddedSchedulerEvent extends SchedulerEvent {
@ -31,15 +33,23 @@ public class AppAddedSchedulerEvent extends SchedulerEvent {
private final ReservationId reservationID; private final ReservationId reservationID;
private final boolean isAppRecovering; private final boolean isAppRecovering;
private final Priority appPriority; private final Priority appPriority;
private final ApplicationPlacementContext placementContext;
public AppAddedSchedulerEvent(ApplicationId applicationId, String queue, public AppAddedSchedulerEvent(ApplicationId applicationId, String queue,
String user) { String user) {
this(applicationId, queue, user, false, null, Priority.newInstance(0)); this(applicationId, queue, user, false, null, Priority.newInstance(0),
null);
}
public AppAddedSchedulerEvent(ApplicationId applicationId, String queue,
String user, ApplicationPlacementContext placementContext) {
this(applicationId, queue, user, false, null, Priority.newInstance(0),
placementContext);
} }
public AppAddedSchedulerEvent(ApplicationId applicationId, String queue, public AppAddedSchedulerEvent(ApplicationId applicationId, String queue,
String user, ReservationId reservationID, Priority appPriority) { String user, ReservationId reservationID, Priority appPriority) {
this(applicationId, queue, user, false, reservationID, appPriority); this(applicationId, queue, user, false, reservationID, appPriority, null);
} }
public AppAddedSchedulerEvent(String user, public AppAddedSchedulerEvent(String user,
@ -47,12 +57,20 @@ public class AppAddedSchedulerEvent extends SchedulerEvent {
Priority appPriority) { Priority appPriority) {
this(submissionContext.getApplicationId(), submissionContext.getQueue(), this(submissionContext.getApplicationId(), submissionContext.getQueue(),
user, isAppRecovering, submissionContext.getReservationID(), user, isAppRecovering, submissionContext.getReservationID(),
appPriority); appPriority, null);
}
public AppAddedSchedulerEvent(String user,
ApplicationSubmissionContext submissionContext, boolean isAppRecovering,
Priority appPriority, ApplicationPlacementContext placementContext) {
this(submissionContext.getApplicationId(), submissionContext.getQueue(),
user, isAppRecovering, submissionContext.getReservationID(),
appPriority, placementContext);
} }
public AppAddedSchedulerEvent(ApplicationId applicationId, String queue, public AppAddedSchedulerEvent(ApplicationId applicationId, String queue,
String user, boolean isAppRecovering, ReservationId reservationID, String user, boolean isAppRecovering, ReservationId reservationID,
Priority appPriority) { Priority appPriority, ApplicationPlacementContext placementContext) {
super(SchedulerEventType.APP_ADDED); super(SchedulerEventType.APP_ADDED);
this.applicationId = applicationId; this.applicationId = applicationId;
this.queue = queue; this.queue = queue;
@ -60,6 +78,7 @@ public class AppAddedSchedulerEvent extends SchedulerEvent {
this.reservationID = reservationID; this.reservationID = reservationID;
this.isAppRecovering = isAppRecovering; this.isAppRecovering = isAppRecovering;
this.appPriority = appPriority; this.appPriority = appPriority;
this.placementContext = placementContext;
} }
public ApplicationId getApplicationId() { public ApplicationId getApplicationId() {
@ -85,4 +104,8 @@ public class AppAddedSchedulerEvent extends SchedulerEvent {
public Priority getApplicatonPriority() { public Priority getApplicatonPriority() {
return appPriority; return appPriority;
} }
public ApplicationPlacementContext getPlacementContext() {
return placementContext;
}
} }

View File

@ -44,7 +44,6 @@ import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.CommonConfigurationKeysPublic; import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
import org.apache.hadoop.io.DataOutputBuffer; import org.apache.hadoop.io.DataOutputBuffer;
import org.apache.hadoop.security.AccessControlException; import org.apache.hadoop.security.AccessControlException;
import org.apache.hadoop.security.Credentials;
import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.service.Service; import org.apache.hadoop.service.Service;
import org.apache.hadoop.yarn.MockApps; import org.apache.hadoop.yarn.MockApps;
@ -70,6 +69,7 @@ import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
import org.apache.hadoop.yarn.server.resourcemanager.ahs.RMApplicationHistoryWriter; import org.apache.hadoop.yarn.server.resourcemanager.ahs.RMApplicationHistoryWriter;
import org.apache.hadoop.yarn.server.resourcemanager.metrics.SystemMetricsPublisher; import org.apache.hadoop.yarn.server.resourcemanager.metrics.SystemMetricsPublisher;
import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager; import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager;
import org.apache.hadoop.yarn.server.resourcemanager.placement.ApplicationPlacementContext;
import org.apache.hadoop.yarn.server.resourcemanager.placement.PlacementManager; import org.apache.hadoop.yarn.server.resourcemanager.placement.PlacementManager;
import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore; import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.MockRMApp; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.MockRMApp;
@ -84,7 +84,6 @@ import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptI
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.ContainerAllocationExpirer; import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.ContainerAllocationExpirer;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.YarnScheduler; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.YarnScheduler;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler;
import org.apache.hadoop.yarn.server.resourcemanager.security.ClientToAMTokenSecretManagerInRM; import org.apache.hadoop.yarn.server.resourcemanager.security.ClientToAMTokenSecretManagerInRM;
import org.apache.hadoop.yarn.server.resourcemanager.timelineservice.RMTimelineCollectorManager; import org.apache.hadoop.yarn.server.resourcemanager.timelineservice.RMTimelineCollectorManager;
import org.apache.hadoop.yarn.server.security.ApplicationACLsManager; import org.apache.hadoop.yarn.server.security.ApplicationACLsManager;
@ -861,23 +860,27 @@ public class TestAppManager{
public void testRMAppSubmitWithQueueChanged() throws Exception { public void testRMAppSubmitWithQueueChanged() throws Exception {
// Setup a PlacementManager returns a new queue // Setup a PlacementManager returns a new queue
PlacementManager placementMgr = mock(PlacementManager.class); PlacementManager placementMgr = mock(PlacementManager.class);
doAnswer(new Answer<Void>() { doAnswer(new Answer<ApplicationPlacementContext>() {
@Override @Override
public Void answer(InvocationOnMock invocation) throws Throwable { public ApplicationPlacementContext answer(InvocationOnMock invocation)
ApplicationSubmissionContext ctx = throws Throwable {
(ApplicationSubmissionContext) invocation.getArguments()[0]; return new ApplicationPlacementContext("newQueue");
ctx.setQueue("newQueue");
return null;
} }
}).when(placementMgr).placeApplication(any(ApplicationSubmissionContext.class), }).when(placementMgr).placeApplication(
any(String.class)); any(ApplicationSubmissionContext.class), any(String.class));
rmContext.setQueuePlacementManager(placementMgr); rmContext.setQueuePlacementManager(placementMgr);
asContext.setQueue("oldQueue"); asContext.setQueue("oldQueue");
appMonitor.submitApplication(asContext, "test"); appMonitor.submitApplication(asContext, "test");
RMApp app = rmContext.getRMApps().get(appId); RMApp app = rmContext.getRMApps().get(appId);
RMAppEvent event = new RMAppEvent(appId, RMAppEventType.START);
rmContext.getRMApps().get(appId).handle(event);
event = new RMAppEvent(appId, RMAppEventType.APP_NEW_SAVED);
rmContext.getRMApps().get(appId).handle(event);
Assert.assertNotNull("app is null", app); Assert.assertNotNull("app is null", app);
Assert.assertEquals("newQueue", asContext.getQueue()); Assert.assertEquals("newQueue", asContext.getQueue());

View File

@ -52,14 +52,14 @@ public class TestUserGroupMappingPlacementRule {
private void verifyQueueMapping(QueueMapping queueMapping, String inputUser, private void verifyQueueMapping(QueueMapping queueMapping, String inputUser,
String inputQueue, String expectedQueue, boolean overwrite) throws YarnException { String inputQueue, String expectedQueue, boolean overwrite) throws YarnException {
Groups groups = new Groups(conf); Groups groups = new Groups(conf);
UserGroupMappingPlacementRule rule = UserGroupMappingPlacementRule rule = new UserGroupMappingPlacementRule(
new UserGroupMappingPlacementRule(overwrite, Arrays.asList(queueMapping), overwrite, Arrays.asList(queueMapping), groups);
groups); ApplicationSubmissionContext asc = Records.newRecord(
ApplicationSubmissionContext asc = ApplicationSubmissionContext.class);
Records.newRecord(ApplicationSubmissionContext.class);
asc.setQueue(inputQueue); asc.setQueue(inputQueue);
String queue = rule.getQueueForApp(asc, inputUser); ApplicationPlacementContext ctx = rule.getPlacementForApp(asc, inputUser);
Assert.assertEquals(expectedQueue, queue); Assert.assertEquals(expectedQueue,
ctx != null ? ctx.getQueue() : inputQueue);
} }
@Test @Test

View File

@ -803,6 +803,7 @@ public class TestSchedulerUtils {
Map<ApplicationId, SchedulerApplication<SchedulerApplicationAttempt>> applications, Map<ApplicationId, SchedulerApplication<SchedulerApplicationAttempt>> applications,
EventHandler<SchedulerEvent> handler, String queueName) EventHandler<SchedulerEvent> handler, String queueName)
throws Exception { throws Exception {
ApplicationId appId = ApplicationId appId =
ApplicationId.newInstance(System.currentTimeMillis(), 1); ApplicationId.newInstance(System.currentTimeMillis(), 1);
AppAddedSchedulerEvent appAddedEvent = AppAddedSchedulerEvent appAddedEvent =

View File

@ -103,6 +103,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.TestAMAuthorization.MockRMW
import org.apache.hadoop.yarn.server.resourcemanager.TestAMAuthorization.MyContainerManager; import org.apache.hadoop.yarn.server.resourcemanager.TestAMAuthorization.MyContainerManager;
import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.NullRMNodeLabelsManager; import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.NullRMNodeLabelsManager;
import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager; import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager;
import org.apache.hadoop.yarn.server.resourcemanager.placement.UserGroupMappingPlacementRule;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppImpl; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppImpl;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppMetrics; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppMetrics;
@ -904,7 +905,7 @@ public class TestCapacityScheduler {
(B3_CAPACITY/100.0f) * capB, 1.0f, 1.0f); (B3_CAPACITY/100.0f) * capB, 1.0f, 1.0f);
} }
private void checkQueueCapacity(CSQueue q, float expectedCapacity, void checkQueueCapacity(CSQueue q, float expectedCapacity,
float expectedAbsCapacity, float expectedMaxCapacity, float expectedAbsCapacity, float expectedMaxCapacity,
float expectedAbsMaxCapacity) { float expectedAbsMaxCapacity) {
final float epsilon = 1e-5f; final float epsilon = 1e-5f;
@ -917,7 +918,7 @@ public class TestCapacityScheduler {
q.getAbsoluteMaximumCapacity(), epsilon); q.getAbsoluteMaximumCapacity(), epsilon);
} }
private CSQueue findQueue(CSQueue root, String queuePath) { CSQueue findQueue(CSQueue root, String queuePath) {
if (root.getQueuePath().equals(queuePath)) { if (root.getQueuePath().equals(queuePath)) {
return root; return root;
} }
@ -1396,7 +1397,6 @@ public class TestCapacityScheduler {
AbstractYarnScheduler<SchedulerApplicationAttempt, SchedulerNode> cs = AbstractYarnScheduler<SchedulerApplicationAttempt, SchedulerNode> cs =
(AbstractYarnScheduler<SchedulerApplicationAttempt, SchedulerNode>) rm (AbstractYarnScheduler<SchedulerApplicationAttempt, SchedulerNode>) rm
.getResourceScheduler(); .getResourceScheduler();
SchedulerApplication<SchedulerApplicationAttempt> app = SchedulerApplication<SchedulerApplicationAttempt> app =
TestSchedulerUtils.verifyAppAddedAndRemovedFromScheduler( TestSchedulerUtils.verifyAppAddedAndRemovedFromScheduler(
cs.getSchedulerApplications(), cs, "a1"); cs.getSchedulerApplications(), cs, "a1");

View File

@ -0,0 +1,794 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.Priority;
import org.apache.hadoop.yarn.api.records.QueueState;
import org.apache.hadoop.yarn.api.records.ResourceRequest;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.event.AsyncDispatcher;
import org.apache.hadoop.yarn.event.Event;
import org.apache.hadoop.yarn.event.EventHandler;
import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.factories.RecordFactory;
import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
import org.apache.hadoop.yarn.server.resourcemanager.MockNodes;
import org.apache.hadoop.yarn.server.resourcemanager.MockRM;
import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
import org.apache.hadoop.yarn.server.resourcemanager.RMContextImpl;
import org.apache.hadoop.yarn.server.resourcemanager.placement.ApplicationPlacementContext;
import org.apache.hadoop.yarn.server.resourcemanager.placement.UserGroupMappingPlacementRule;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppEvent;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppEventType;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppState;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptState;
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ContainerUpdates;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerDynamicEditException;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.QueueEntitlement;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppAddedSchedulerEvent;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppAttemptAddedSchedulerEvent;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeAddedSchedulerEvent;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.SchedulerEvent;
import org.apache.hadoop.yarn.server.resourcemanager.security.ClientToAMTokenSecretManagerInRM;
import org.apache.hadoop.yarn.server.resourcemanager.security.NMTokenSecretManagerInRM;
import org.apache.hadoop.yarn.server.resourcemanager.security.RMContainerTokenSecretManager;
import org.apache.hadoop.yarn.server.utils.BuilderUtils;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import static org.apache.hadoop.yarn.server.resourcemanager.placement.UserGroupMappingPlacementRule.CURRENT_USER_MAPPING;
import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CSQueueUtils.EPSILON;
import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerConfiguration.DOT;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
/**
* Tests for creation and reinitilization of auto created leaf queues
* under a ManagedParentQueue.
*/
public class TestCapacitySchedulerAutoQueueCreation {
private static final Log LOG = LogFactory.getLog(TestCapacityScheduler.class);
private final int GB = 1024;
private final static ContainerUpdates NULL_UPDATE_REQUESTS =
new ContainerUpdates();
private static final String A = CapacitySchedulerConfiguration.ROOT + ".a";
private static final String B = CapacitySchedulerConfiguration.ROOT + ".b";
private static final String C = CapacitySchedulerConfiguration.ROOT + ".c";
private static final String D = CapacitySchedulerConfiguration.ROOT + ".d";
private static final String A1 = A + ".a1";
private static final String A2 = A + ".a2";
private static final String B1 = B + ".b1";
private static final String B2 = B + ".b2";
private static final String B3 = B + ".b3";
private static final String C1 = C + ".c1";
private static final String C2 = C + ".c2";
private static final String C3 = C + ".c3";
private static float A_CAPACITY = 20f;
private static float B_CAPACITY = 40f;
private static float C_CAPACITY = 20f;
private static float D_CAPACITY = 20f;
private static float A1_CAPACITY = 30;
private static float A2_CAPACITY = 70;
private static float B1_CAPACITY = 60f;
private static float B2_CAPACITY = 20f;
private static float B3_CAPACITY = 20f;
private static float C1_CAPACITY = 20f;
private static float C2_CAPACITY = 20f;
private static String USER = "user_";
private static String USER0 = USER + 0;
private static String USER2 = USER + 2;
private static String PARENT_QUEUE = "c";
private MockRM mockRM = null;
private CapacityScheduler cs;
private final TestCapacityScheduler tcs = new TestCapacityScheduler();
private static SpyDispatcher dispatcher;
private static EventHandler<Event> rmAppEventEventHandler;
private static class SpyDispatcher extends AsyncDispatcher {
private static BlockingQueue<Event> eventQueue =
new LinkedBlockingQueue<>();
private static class SpyRMAppEventHandler implements EventHandler<Event> {
public void handle(Event event) {
eventQueue.add(event);
}
}
@Override
protected void dispatch(Event event) {
eventQueue.add(event);
}
@Override
public EventHandler<Event> getEventHandler() {
return rmAppEventEventHandler;
}
void spyOnNextEvent(Event expectedEvent, long timeout)
throws InterruptedException {
Event event = eventQueue.poll(timeout, TimeUnit.MILLISECONDS);
assertEquals(expectedEvent.getType(), event.getType());
assertEquals(expectedEvent.getClass(), event.getClass());
}
}
@Before
public void setUp() throws Exception {
CapacitySchedulerConfiguration conf = new CapacitySchedulerConfiguration();
setupQueueConfiguration(conf);
conf.setClass(YarnConfiguration.RM_SCHEDULER, CapacityScheduler.class,
ResourceScheduler.class);
List<String> queuePlacementRules = new ArrayList<>();
queuePlacementRules.add(YarnConfiguration.USER_GROUP_PLACEMENT_RULE);
conf.setQueuePlacementRules(queuePlacementRules);
setupQueueMappings(conf);
mockRM = new MockRM(conf);
cs = (CapacityScheduler) mockRM.getResourceScheduler();
dispatcher = new SpyDispatcher();
rmAppEventEventHandler = new SpyDispatcher.SpyRMAppEventHandler();
dispatcher.register(RMAppEventType.class, rmAppEventEventHandler);
cs.updatePlacementRules();
mockRM.start();
cs.start();
}
private CapacitySchedulerConfiguration setupQueueMappings(
CapacitySchedulerConfiguration conf) {
//set queue mapping
List<UserGroupMappingPlacementRule.QueueMapping> queueMappings =
new ArrayList<>();
for (int i = 0; i <= 3; i++) {
//Set C as parent queue name for auto queue creation
UserGroupMappingPlacementRule.QueueMapping userQueueMapping =
new UserGroupMappingPlacementRule.QueueMapping(
UserGroupMappingPlacementRule.QueueMapping.MappingType.USER,
USER + i, getQueueMapping(PARENT_QUEUE, USER + i));
queueMappings.add(userQueueMapping);
}
conf.setQueueMappings(queueMappings);
//override with queue mappings
conf.setOverrideWithQueueMappings(true);
return conf;
}
/**
* @param conf, to be modified
* @return, CS configuration which has C
* as an auto creation enabled parent queue
* <p>
* root
* / \ \ \
* a b c d
* / \ / | \
* a1 a2 b1 b2 b3
*/
private CapacitySchedulerConfiguration setupQueueConfiguration(
CapacitySchedulerConfiguration conf) {
//setup new queues with one of them auto enabled
// Define top-level queues
// Set childQueue for root
conf.setQueues(CapacitySchedulerConfiguration.ROOT,
new String[] { "a", "b", "c", "d" });
conf.setCapacity(A, A_CAPACITY);
conf.setCapacity(B, B_CAPACITY);
conf.setCapacity(C, C_CAPACITY);
conf.setCapacity(D, D_CAPACITY);
// Define 2nd-level queues
conf.setQueues(A, new String[] { "a1", "a2" });
conf.setCapacity(A1, A1_CAPACITY);
conf.setUserLimitFactor(A1, 100.0f);
conf.setCapacity(A2, A2_CAPACITY);
conf.setUserLimitFactor(A2, 100.0f);
conf.setQueues(B, new String[] { "b1", "b2", "b3" });
conf.setCapacity(B1, B1_CAPACITY);
conf.setUserLimitFactor(B1, 100.0f);
conf.setCapacity(B2, B2_CAPACITY);
conf.setUserLimitFactor(B2, 100.0f);
conf.setCapacity(B3, B3_CAPACITY);
conf.setUserLimitFactor(B3, 100.0f);
conf.setUserLimitFactor(C, 1.0f);
conf.setAutoCreateChildQueueEnabled(C, true);
//Setup leaf queue template configs
conf.setAutoCreatedLeafQueueTemplateCapacity(C, 50.0f);
conf.setAutoCreatedLeafQueueTemplateMaxCapacity(C, 100.0f);
LOG.info("Setup " + C + " as an auto leaf creation enabled parent queue");
conf.setUserLimitFactor(D, 1.0f);
conf.setAutoCreateChildQueueEnabled(D, true);
//Setup leaf queue template configs
conf.setAutoCreatedLeafQueueTemplateCapacity(D, 10.0f);
conf.setAutoCreatedLeafQueueTemplateMaxCapacity(D, 100.0f);
LOG.info("Setup " + D + " as an auto leaf creation enabled parent queue");
return conf;
}
@After
public void tearDown() throws Exception {
if (mockRM != null) {
mockRM.stop();
}
}
@Test(timeout = 10000)
public void testAutoCreateLeafQueueCreation() throws Exception {
try {
// submit an app
submitApp(cs, USER0, USER0, PARENT_QUEUE);
// check preconditions
List<ApplicationAttemptId> appsInC = cs.getAppsInQueue(PARENT_QUEUE);
assertEquals(1, appsInC.size());
assertNotNull(cs.getQueue(USER0));
AutoCreatedLeafQueue autoCreatedLeafQueue =
(AutoCreatedLeafQueue) cs.getQueue(USER0);
ManagedParentQueue parentQueue = (ManagedParentQueue) cs.getQueue(
PARENT_QUEUE);
assertEquals(parentQueue, autoCreatedLeafQueue.getParent());
validateCapacities(autoCreatedLeafQueue);
} finally {
cleanupQueue(USER0);
}
}
@Test
public void testReinitializeStoppedAutoCreatedLeafQueue() throws Exception {
try {
String host = "127.0.0.1";
RMNode node = MockNodes.newNodeInfo(0, MockNodes.newResource(4 * GB), 1,
host);
cs.handle(new NodeAddedSchedulerEvent(node));
// submit an app
RMApp app = mockRM.submitApp(GB, "test-auto-queue-creation-1", USER0,
null, USER0);
// check preconditions
List<ApplicationAttemptId> appsInC = cs.getAppsInQueue(PARENT_QUEUE);
assertEquals(1, appsInC.size());
assertNotNull(cs.getQueue(USER0));
AutoCreatedLeafQueue autoCreatedLeafQueue =
(AutoCreatedLeafQueue) cs.getQueue(USER0);
ManagedParentQueue parentQueue = (ManagedParentQueue) cs.getQueue(
PARENT_QUEUE);
assertEquals(parentQueue, autoCreatedLeafQueue.getParent());
validateCapacities(autoCreatedLeafQueue);
ApplicationAttemptId appAttemptId = appsInC.get(0);
Priority priority = TestUtils.createMockPriority(1);
RecordFactory recordFactory = RecordFactoryProvider.getRecordFactory(
null);
ResourceRequest r1 = TestUtils.createResourceRequest(ResourceRequest.ANY,
1 * GB, 1, true, priority, recordFactory);
cs.allocate(appAttemptId, Collections.<ResourceRequest>singletonList(r1),
Collections.<ContainerId>emptyList(), Collections.singletonList(host),
null, NULL_UPDATE_REQUESTS);
//And this will result in container assignment for app1
CapacityScheduler.schedule(cs);
//change state to draining
autoCreatedLeafQueue.stopQueue();
cs.killAllAppsInQueue(USER0);
mockRM.waitForState(appAttemptId, RMAppAttemptState.KILLED);
mockRM.waitForState(appAttemptId.getApplicationId(), RMAppState.KILLED);
//change state to stopped
autoCreatedLeafQueue.stopQueue();
assertEquals(QueueState.STOPPED,
autoCreatedLeafQueue.getQueueInfo().getQueueState());
cs.reinitialize(cs.getConf(), mockRM.getRMContext());
AutoCreatedLeafQueue leafQueue = (AutoCreatedLeafQueue) cs.getQueue(
USER0);
validateCapacities(leafQueue);
} finally {
cleanupQueue(USER0);
}
}
@Test
public void testRefreshQueuesWithAutoCreatedLeafQueues() throws Exception {
MockRM newMockRM = setupSchedulerInstance();
try {
CapacityScheduler newCS =
(CapacityScheduler) newMockRM.getResourceScheduler();
CapacitySchedulerConfiguration conf = newCS.getConfiguration();
// Test add one auto created queue dynamically and manually modify
// capacity
ManagedParentQueue parentQueue = (ManagedParentQueue) newCS.getQueue("c");
AutoCreatedLeafQueue c1 = new AutoCreatedLeafQueue(newCS, "c1",
parentQueue);
newCS.addQueue(c1);
c1.setEntitlement(new QueueEntitlement(C1_CAPACITY / 100, 1f));
// Test add another auto created queue and use setEntitlement to modify
// capacity
AutoCreatedLeafQueue c2 = new AutoCreatedLeafQueue(newCS, "c2",
(ManagedParentQueue) newCS.getQueue("c"));
newCS.addQueue(c2);
newCS.setEntitlement("c2", new QueueEntitlement(C2_CAPACITY / 100, 1f));
// Verify all allocations match
checkQueueCapacities(newCS, C_CAPACITY, D_CAPACITY);
// Reinitialize and verify all dynamic queued survived
conf.setCapacity(A, 20f);
conf.setCapacity(B, 20f);
conf.setCapacity(C, 40f);
conf.setCapacity(D, 20f);
newCS.reinitialize(conf, newMockRM.getRMContext());
checkQueueCapacities(newCS, 40f, 20f);
//chnage parent template configs and reinitialize
conf.setAutoCreatedLeafQueueTemplateCapacity(C, 30.0f);
conf.setAutoCreatedLeafQueueTemplateMaxCapacity(C, 100.0f);
newCS.reinitialize(conf, newMockRM.getRMContext());
ManagedParentQueue c = (ManagedParentQueue) newCS.getQueue("c");
AutoCreatedLeafQueue c3 = new AutoCreatedLeafQueue(newCS, "c3", c);
newCS.addQueue(c3);
AbstractManagedParentQueue.AutoCreatedLeafQueueTemplate
leafQueueTemplate = parentQueue.getLeafQueueTemplate();
QueueCapacities cap = leafQueueTemplate.getQueueCapacities();
c3.setEntitlement(
new QueueEntitlement(cap.getCapacity(), cap.getMaximumCapacity()));
newCS.reinitialize(conf, newMockRM.getRMContext());
checkQueueCapacities(newCS, 40f, 20f);
} finally {
if (newMockRM != null) {
((CapacityScheduler) newMockRM.getResourceScheduler()).stop();
newMockRM.stop();
}
}
}
@Test
public void testConvertAutoCreateDisabledOnManagedParentQueueFails()
throws Exception {
CapacityScheduler newCS = new CapacityScheduler();
try {
CapacitySchedulerConfiguration newConf =
new CapacitySchedulerConfiguration();
setupQueueConfiguration(newConf);
newConf.setAutoCreateChildQueueEnabled(C, false);
newCS.setConf(new YarnConfiguration());
newCS.setRMContext(mockRM.getRMContext());
newCS.init(cs.getConf());
newCS.start();
newCS.reinitialize(newConf,
new RMContextImpl(null, null, null, null, null, null,
new RMContainerTokenSecretManager(newConf),
new NMTokenSecretManagerInRM(newConf),
new ClientToAMTokenSecretManagerInRM(), null));
} catch (IOException e) {
//expected exception
} finally {
newCS.stop();
}
}
@Test
public void testConvertLeafQueueToParentQueueWithAutoCreate()
throws Exception {
CapacityScheduler newCS = new CapacityScheduler();
try {
CapacitySchedulerConfiguration newConf =
new CapacitySchedulerConfiguration();
setupQueueConfiguration(newConf);
newConf.setAutoCreatedLeafQueueTemplateCapacity(A1, A1_CAPACITY / 10);
newConf.setAutoCreateChildQueueEnabled(A1, true);
newCS.setConf(new YarnConfiguration());
newCS.setRMContext(mockRM.getRMContext());
newCS.init(cs.getConf());
newCS.start();
final LeafQueue a1Queue = (LeafQueue) newCS.getQueue("a1");
a1Queue.stopQueue();
newCS.reinitialize(newConf,
new RMContextImpl(null, null, null, null, null, null,
new RMContainerTokenSecretManager(newConf),
new NMTokenSecretManagerInRM(newConf),
new ClientToAMTokenSecretManagerInRM(), null));
} finally {
newCS.stop();
}
}
@Test
public void testConvertFailsFromParentQueueToManagedParentQueue()
throws Exception {
CapacityScheduler newCS = new CapacityScheduler();
try {
CapacitySchedulerConfiguration newConf =
new CapacitySchedulerConfiguration();
setupQueueConfiguration(newConf);
newConf.setAutoCreatedLeafQueueTemplateCapacity(A, A_CAPACITY / 10);
newConf.setAutoCreateChildQueueEnabled(A, true);
newCS.setConf(new YarnConfiguration());
newCS.setRMContext(mockRM.getRMContext());
newCS.init(cs.getConf());
newCS.start();
final ParentQueue a1Queue = (ParentQueue) newCS.getQueue("a");
a1Queue.stopQueue();
newCS.reinitialize(newConf,
new RMContextImpl(null, null, null, null, null, null,
new RMContainerTokenSecretManager(newConf),
new NMTokenSecretManagerInRM(newConf),
new ClientToAMTokenSecretManagerInRM(), null));
fail("Expected exception while converting a parent queue to"
+ " an auto create enabled parent queue");
} catch (IOException e) {
//expected exception
} finally {
newCS.stop();
}
}
@Test(timeout = 10000)
public void testAutoCreateLeafQueueFailsWithNoQueueMapping()
throws Exception {
final String INVALID_USER = "invalid_user";
// submit an app under a different queue name which does not exist
// and queue mapping does not exist for this user
RMApp app = mockRM.submitApp(GB, "app", INVALID_USER, null, INVALID_USER,
false);
mockRM.drainEvents();
mockRM.waitForState(app.getApplicationId(), RMAppState.FAILED);
assertEquals(RMAppState.FAILED, app.getState());
}
private void validateCapacities(AutoCreatedLeafQueue autoCreatedLeafQueue) {
assertEquals(autoCreatedLeafQueue.getCapacity(), 0.0f, EPSILON);
assertEquals(autoCreatedLeafQueue.getAbsoluteCapacity(), 0.0f, EPSILON);
assertEquals(autoCreatedLeafQueue.getMaximumCapacity(), 0.0f, EPSILON);
assertEquals(autoCreatedLeafQueue.getAbsoluteMaximumCapacity(), 0.0f,
EPSILON);
int maxAppsForAutoCreatedQueues = (int) (
CapacitySchedulerConfiguration.DEFAULT_MAXIMUM_SYSTEM_APPLICATIIONS
* autoCreatedLeafQueue.getParent().getAbsoluteCapacity());
assertEquals(autoCreatedLeafQueue.getMaxApplicationsPerUser(),
maxAppsForAutoCreatedQueues);
assertEquals(autoCreatedLeafQueue.getMaxApplicationsPerUser(),
(int) (maxAppsForAutoCreatedQueues * (cs.getConfiguration()
.getUserLimitFactor(
autoCreatedLeafQueue.getParent().getQueuePath()))));
}
private void cleanupQueue(String queueName) throws YarnException {
AutoCreatedLeafQueue queue = (AutoCreatedLeafQueue) cs.getQueue(queueName);
if (queue != null) {
queue.setEntitlement(new QueueEntitlement(0.0f, 0.0f));
((ManagedParentQueue) queue.getParent()).removeChildQueue(
queue.getQueueName());
cs.getCapacitySchedulerQueueManager().removeQueue(queue.getQueueName());
} else{
throw new YarnException("Queue does not exist " + queueName);
}
}
String getQueueMapping(String parentQueue, String leafQueue) {
return parentQueue + DOT + leafQueue;
}
@Test(timeout = 10000)
public void testQueueMappingValidationFailsWithInvalidParentQueueInMapping()
throws Exception {
MockRM newMockRM = setupSchedulerInstance();
try {
CapacityScheduler newCS =
(CapacityScheduler) newMockRM.getResourceScheduler();
//"a" is not auto create enabled
//dynamic queue mapping
try {
setupQueueMapping(newCS, CURRENT_USER_MAPPING, "a",
CURRENT_USER_MAPPING);
newCS.updatePlacementRules();
fail("Expected invalid parent queue mapping failure");
} catch (IOException e) {
//expected exception
assertTrue(e.getMessage().contains(
"invalid parent queue which does not have auto creation of leaf "
+ "queues enabled ["
+ "a" + "]"));
}
//"a" is not auto create enabled and app_user does not exist as a leaf
// queue
//static queue mapping
try {
setupQueueMapping(newCS, "app_user", "INVALID_PARENT_QUEUE",
"app_user");
newCS.updatePlacementRules();
fail("Expected invalid parent queue mapping failure");
} catch (IOException e) {
//expected exception
assertTrue(e.getMessage()
.contains("invalid parent queue [" + "INVALID_PARENT_QUEUE" + "]"));
}
} finally {
if (newMockRM != null) {
((CapacityScheduler) newMockRM.getResourceScheduler()).stop();
newMockRM.stop();
}
}
}
@Test(timeout = 10000)
public void testQueueMappingUpdatesFailsOnRemovalOfParentQueueInMapping()
throws Exception {
MockRM newMockRM = setupSchedulerInstance();
try {
CapacityScheduler newCS =
(CapacityScheduler) newMockRM.getResourceScheduler();
setupQueueMapping(newCS, CURRENT_USER_MAPPING, "c", CURRENT_USER_MAPPING);
newCS.updatePlacementRules();
try {
setupQueueMapping(newCS, CURRENT_USER_MAPPING, "",
CURRENT_USER_MAPPING);
newCS.updatePlacementRules();
fail("Expected invalid parent queue mapping failure");
} catch (IOException e) {
//expected exception
assertTrue(e.getMessage().contains("invalid parent queue []"));
}
} finally {
if (newMockRM != null) {
((CapacityScheduler) newMockRM.getResourceScheduler()).stop();
newMockRM.stop();
}
}
}
@Test
public void testParentQueueUpdateInQueueMappingFailsAfterAutoCreation()
throws Exception {
MockRM newMockRM = setupSchedulerInstance();
CapacityScheduler newCS =
(CapacityScheduler) newMockRM.getResourceScheduler();
try {
newMockRM.start();
newCS.start();
submitApp(newCS, USER0, USER0, PARENT_QUEUE);
assertNotNull(newCS.getQueue(USER0));
setupQueueMapping(newCS, USER0, "d", USER0);
newCS.updatePlacementRules();
RMContext rmContext = mock(RMContext.class);
when(rmContext.getDispatcher()).thenReturn(dispatcher);
newCS.setRMContext(rmContext);
ApplicationId appId = BuilderUtils.newApplicationId(1, 1);
SchedulerEvent addAppEvent = new AppAddedSchedulerEvent(appId, USER0,
USER0, new ApplicationPlacementContext(USER0, "d"));
newCS.handle(addAppEvent);
RMAppEvent event = new RMAppEvent(appId, RMAppEventType.APP_REJECTED,
"error");
dispatcher.spyOnNextEvent(event, 10000);
} finally {
if (newMockRM != null) {
((CapacityScheduler) newMockRM.getResourceScheduler()).stop();
newMockRM.stop();
}
}
}
@Test
public void testAutoCreationFailsWhenParentCapacityExceeded()
throws IOException, SchedulerDynamicEditException {
MockRM newMockRM = setupSchedulerInstance();
CapacityScheduler newCS =
(CapacityScheduler) newMockRM.getResourceScheduler();
try {
CapacitySchedulerConfiguration conf = newCS.getConfiguration();
conf.setShouldFailAutoQueueCreationWhenGuaranteedCapacityExceeded(C,
true);
newCS.reinitialize(conf, newMockRM.getRMContext());
// Test add one auto created queue dynamically and manually modify
// capacity
ManagedParentQueue parentQueue = (ManagedParentQueue) newCS.getQueue("c");
AutoCreatedLeafQueue c1 = new AutoCreatedLeafQueue(newCS, "c1",
parentQueue);
newCS.addQueue(c1);
c1.setEntitlement(new QueueEntitlement(0.5f, 1f));
AutoCreatedLeafQueue c2 = new AutoCreatedLeafQueue(newCS, "c2",
parentQueue);
newCS.addQueue(c2);
c2.setEntitlement(new QueueEntitlement(0.5f, 1f));
try {
AutoCreatedLeafQueue c3 = new AutoCreatedLeafQueue(newCS, "c3",
parentQueue);
newCS.addQueue(c3);
fail("Expected exception for auto queue creation failure");
} catch (SchedulerDynamicEditException e) {
//expected exception
}
} finally {
if (newMockRM != null) {
((CapacityScheduler) newMockRM.getResourceScheduler()).stop();
newMockRM.stop();
}
}
}
private List<UserGroupMappingPlacementRule.QueueMapping> setupQueueMapping(
CapacityScheduler newCS, String user, String parentQueue, String queue) {
List<UserGroupMappingPlacementRule.QueueMapping> queueMappings =
new ArrayList<>();
queueMappings.add(new UserGroupMappingPlacementRule.QueueMapping(
UserGroupMappingPlacementRule.QueueMapping.MappingType.USER, user,
getQueueMapping(parentQueue, queue)));
newCS.getConfiguration().setQueueMappings(queueMappings);
return queueMappings;
}
private MockRM setupSchedulerInstance() {
CapacitySchedulerConfiguration conf = new CapacitySchedulerConfiguration();
setupQueueConfiguration(conf);
conf.setClass(YarnConfiguration.RM_SCHEDULER, CapacityScheduler.class,
ResourceScheduler.class);
List<String> queuePlacementRules = new ArrayList<String>();
queuePlacementRules.add(YarnConfiguration.USER_GROUP_PLACEMENT_RULE);
conf.setQueuePlacementRules(queuePlacementRules);
setupQueueMappings(conf);
MockRM newMockRM = new MockRM(conf);
return newMockRM;
}
void checkQueueCapacities(CapacityScheduler newCS, float capacityC,
float capacityD) {
CSQueue rootQueue = newCS.getRootQueue();
CSQueue queueC = tcs.findQueue(rootQueue, C);
CSQueue queueD = tcs.findQueue(rootQueue, D);
CSQueue queueC1 = tcs.findQueue(queueC, C1);
CSQueue queueC2 = tcs.findQueue(queueC, C2);
CSQueue queueC3 = tcs.findQueue(queueC, C3);
float capC = capacityC / 100.0f;
float capD = capacityD / 100.0f;
tcs.checkQueueCapacity(queueC, capC, capC, 1.0f, 1.0f);
tcs.checkQueueCapacity(queueD, capD, capD, 1.0f, 1.0f);
tcs.checkQueueCapacity(queueC1, C1_CAPACITY / 100.0f,
(C1_CAPACITY / 100.0f) * capC, 1.0f, 1.0f);
tcs.checkQueueCapacity(queueC2, C2_CAPACITY / 100.0f,
(C2_CAPACITY / 100.0f) * capC, 1.0f, 1.0f);
if (queueC3 != null) {
ManagedParentQueue parentQueue = (ManagedParentQueue) queueC;
QueueCapacities cap =
parentQueue.getLeafQueueTemplate().getQueueCapacities();
tcs.checkQueueCapacity(queueC3, cap.getCapacity(),
(cap.getCapacity()) * capC, 1.0f, 1.0f);
}
}
ApplicationAttemptId submitApp(CapacityScheduler newCS, String user,
String queue, String parentQueue) {
ApplicationId appId = BuilderUtils.newApplicationId(1, 1);
SchedulerEvent addAppEvent = new AppAddedSchedulerEvent(appId, queue, user,
new ApplicationPlacementContext(queue, parentQueue));
ApplicationAttemptId appAttemptId = BuilderUtils.newApplicationAttemptId(
appId, 1);
SchedulerEvent addAttemptEvent = new AppAttemptAddedSchedulerEvent(
appAttemptId, false);
newCS.handle(addAppEvent);
newCS.handle(addAttemptEvent);
return appAttemptId;
}
}